diff --git a/dd-java-agent/instrumentation/jms-2/src/main/java/datadog/trace/instrumentation/jms2/JMS2MessageConsumerInstrumentation.java b/dd-java-agent/instrumentation/jms-2/src/main/java/datadog/trace/instrumentation/jms2/JMS2MessageConsumerInstrumentation.java index 1d58895b9c..87869d9424 100644 --- a/dd-java-agent/instrumentation/jms-2/src/main/java/datadog/trace/instrumentation/jms2/JMS2MessageConsumerInstrumentation.java +++ b/dd-java-agent/instrumentation/jms-2/src/main/java/datadog/trace/instrumentation/jms2/JMS2MessageConsumerInstrumentation.java @@ -22,9 +22,11 @@ import datadog.trace.instrumentation.jms.util.MessagePropertyTextMap; import io.opentracing.Scope; import io.opentracing.Span; import io.opentracing.SpanContext; +import io.opentracing.Tracer; import io.opentracing.propagation.Format; import io.opentracing.tag.Tags; import io.opentracing.util.GlobalTracer; +import java.lang.reflect.Method; import java.util.Collections; import java.util.concurrent.TimeUnit; import javax.jms.Message; @@ -54,7 +56,7 @@ public final class JMS2MessageConsumerInstrumentation extends Instrumenter.Confi .transform( DDAdvice.create() .advice( - named("receive").and(takesArguments(0)).and(isPublic()), + named("receive").and(takesArguments(0).or(takesArguments(1))).and(isPublic()), ConsumerAdvice.class.getName()) .advice( named("receiveNoWait").and(takesArguments(0)).and(isPublic()), @@ -73,31 +75,44 @@ public final class JMS2MessageConsumerInstrumentation extends Instrumenter.Confi public static void stopSpan( @Advice.This final MessageConsumer consumer, @Advice.Enter final long startTime, + @Advice.Origin final Method method, @Advice.Return final Message message, @Advice.Thrown final Throwable throwable) { - - final SpanContext extractedContext = - GlobalTracer.get().extract(Format.Builtin.TEXT_MAP, new MessagePropertyTextMap(message)); - - final Scope scope = + Tracer.SpanBuilder spanBuilder = GlobalTracer.get() .buildSpan("jms.consume") - .asChildOf(extractedContext) .withTag(DDTags.SERVICE_NAME, "jms") .withTag(DDTags.SPAN_TYPE, DDSpanTypes.MESSAGE_CONSUMER) .withTag(Tags.COMPONENT.getKey(), "jms2") .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER) .withTag("span.origin.type", consumer.getClass().getName()) - .withStartTimestamp(TimeUnit.MILLISECONDS.toMicros(startTime)) - .startActive(true); + .withStartTimestamp(TimeUnit.MILLISECONDS.toMicros(startTime)); + String resourceNamePrefix = "JMS " + method.getName() + ": "; + if (message == null) { + spanBuilder = spanBuilder.withTag(DDTags.RESOURCE_NAME, resourceNamePrefix + "no message"); + } else { + spanBuilder = + spanBuilder.withTag( + DDTags.RESOURCE_NAME, + resourceNamePrefix + "consumed from " + toResourceName(message, null)); + + final SpanContext extractedContext = + GlobalTracer.get() + .extract(Format.Builtin.TEXT_MAP, new MessagePropertyTextMap(message)); + if (extractedContext != null) { + spanBuilder = spanBuilder.asChildOf(extractedContext); + } + } + + final Scope scope = spanBuilder.startActive(true); final Span span = scope.span(); if (throwable != null) { Tags.ERROR.set(span, Boolean.TRUE); span.log(Collections.singletonMap(ERROR_OBJECT, throwable)); } - span.setTag(DDTags.RESOURCE_NAME, "Consumed from " + toResourceName(message, null)); + scope.close(); } } diff --git a/dd-java-agent/instrumentation/jms-2/src/test/groovy/JMS2Test.groovy b/dd-java-agent/instrumentation/jms-2/src/test/groovy/JMS2Test.groovy index 460118b2fb..5b035803c2 100644 --- a/dd-java-agent/instrumentation/jms-2/src/test/groovy/JMS2Test.groovy +++ b/dd-java-agent/instrumentation/jms-2/src/test/groovy/JMS2Test.groovy @@ -1,6 +1,9 @@ import com.google.common.io.Files import datadog.trace.agent.test.AgentTestRunner +import datadog.trace.agent.test.ListWriterAssert import datadog.trace.api.DDSpanTypes +import datadog.trace.api.DDTags +import io.opentracing.tag.Tags import org.hornetq.api.core.TransportConfiguration import org.hornetq.api.core.client.HornetQClient import org.hornetq.api.jms.HornetQJMSClient @@ -23,7 +26,11 @@ import javax.jms.TextMessage import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicReference +import static datadog.trace.agent.test.ListWriterAssert.assertTraces + class JMS2Test extends AgentTestRunner { + @Shared + String messageText = "a message" @Shared static Session session @@ -62,81 +69,53 @@ class JMS2Test extends AgentTestRunner { session.run() } - def "sending a message to #resourceName generates spans"() { + def "sending a message to #jmsResourceName generates spans"() { setup: def producer = session.createProducer(destination) def consumer = session.createConsumer(destination) - def message = session.createTextMessage("a message") + def message = session.createTextMessage(messageText) producer.send(message) TextMessage receivedMessage = consumer.receive() expect: - receivedMessage.text == "a message" - TEST_WRITER.size() == 2 + receivedMessage.text == messageText + assertTraces(TEST_WRITER, 2) { + producerTrace(it, 0, jmsResourceName) + trace(1, 1) { // Consumer trace + span(0) { + childOf TEST_WRITER.firstTrace().get(0) + serviceName "jms" + operationName "jms.consume" + resourceName "JMS receive: consumed from $jmsResourceName" + spanType DDSpanTypes.MESSAGE_PRODUCER + errored false - and: // producer trace - def trace = TEST_WRITER.firstTrace() - trace.size() == 1 - - def producerSpan = trace[0] - - producerSpan.context().operationName == "jms.produce" - producerSpan.serviceName == "jms" - producerSpan.resourceName == "Produced for $resourceName" - producerSpan.type == DDSpanTypes.MESSAGE_PRODUCER - !producerSpan.context().getErrorFlag() - producerSpan.context().parentId == 0 - - - def producerTags = producerSpan.context().tags - producerTags["span.kind"] == "producer" - producerTags["component"] == "jms2" - - producerTags["span.origin.type"] == HornetQMessageProducer.name - - producerTags["thread.name"] != null - producerTags["thread.id"] != null - producerTags.size() == 6 - - and: // consumer trace - def consumerTrace = TEST_WRITER.get(1) - consumerTrace.size() == 1 - - def consumerSpan = consumerTrace[0] - - consumerSpan.context().operationName == "jms.consume" - consumerSpan.serviceName == "jms" - consumerSpan.resourceName == "Consumed from $resourceName" - consumerSpan.type == DDSpanTypes.MESSAGE_CONSUMER - !consumerSpan.context().getErrorFlag() - consumerSpan.context().parentId == producerSpan.context().spanId - - - def consumerTags = consumerSpan.context().tags - consumerTags["span.kind"] == "consumer" - consumerTags["component"] == "jms2" - - consumerTags["span.origin.type"] == HornetQMessageConsumer.name - - consumerTags["thread.name"] != null - consumerTags["thread.id"] != null - consumerTags.size() == 6 + tags { + defaultTags() + "${DDTags.SPAN_TYPE}" DDSpanTypes.MESSAGE_CONSUMER + "${Tags.COMPONENT.key}" "jms2" + "${Tags.SPAN_KIND.key}" "consumer" + "span.origin.type" HornetQMessageConsumer.name + } + } + } + } cleanup: producer.close() consumer.close() where: - destination | resourceName + destination | jmsResourceName session.createQueue("someQueue") | "Queue someQueue" session.createTopic("someTopic") | "Topic someTopic" session.createTemporaryQueue() | "Temporary Queue" session.createTemporaryTopic() | "Temporary Topic" } - def "sending to a MessageListener on #resourceName generates a span"() { + def "sending to a MessageListener on #jmsResourceName generates a span"() { setup: def lock = new CountDownLatch(1) def messageRef = new AtomicReference() @@ -150,72 +129,164 @@ class JMS2Test extends AgentTestRunner { } } - def message = session.createTextMessage("a message") + def message = session.createTextMessage(messageText) producer.send(message) lock.countDown() - TEST_WRITER.waitForTraces(2) expect: - messageRef.get().text == "a message" - TEST_WRITER.size() == 2 + assertTraces(TEST_WRITER, 2) { + producerTrace(it, 0, jmsResourceName) + trace(1, 1) { // Consumer trace + span(0) { + childOf TEST_WRITER.firstTrace().get(0) + serviceName "jms" + operationName "jms.onMessage" + resourceName "Received from $jmsResourceName" + spanType DDSpanTypes.MESSAGE_PRODUCER + errored false - and: // producer trace - def trace = TEST_WRITER.firstTrace() - trace.size() == 1 - - def producerSpan = trace[0] - - producerSpan.context().operationName == "jms.produce" - producerSpan.serviceName == "jms" - producerSpan.resourceName == "Produced for $resourceName" - producerSpan.type == DDSpanTypes.MESSAGE_PRODUCER - !producerSpan.context().getErrorFlag() - producerSpan.context().parentId == 0 - - - def producerTags = producerSpan.context().tags - producerTags["span.kind"] == "producer" - producerTags["component"] == "jms2" - - producerTags["span.origin.type"] == HornetQMessageProducer.name - - producerTags["thread.name"] != null - producerTags["thread.id"] != null - producerTags.size() == 6 - - and: // consumer trace - def consumerTrace = TEST_WRITER.get(1) - consumerTrace.size() == 1 - - def consumerSpan = consumerTrace[0] - - consumerSpan.context().operationName == "jms.onMessage" - consumerSpan.serviceName == "jms" - consumerSpan.resourceName == "Received from $resourceName" - consumerSpan.type == DDSpanTypes.MESSAGE_CONSUMER - !consumerSpan.context().getErrorFlag() - consumerSpan.context().parentId == producerSpan.context().spanId - - - def consumerTags = consumerSpan.context().tags - consumerTags["span.kind"] == "consumer" - consumerTags["component"] == "jms2" - - consumerTags["span.origin.type"] != null - - consumerTags["thread.name"] != null - consumerTags["thread.id"] != null - consumerTags.size() == 6 + tags { + defaultTags() + "${DDTags.SPAN_TYPE}" DDSpanTypes.MESSAGE_CONSUMER + "${Tags.COMPONENT.key}" "jms2" + "${Tags.SPAN_KIND.key}" "consumer" + "span.origin.type" { t -> t.contains("JMS2Test") } + } + } + } + } + // This check needs to go after all traces have been accounted for + messageRef.get().text == messageText cleanup: producer.close() consumer.close() where: - destination | resourceName + destination | jmsResourceName session.createQueue("someQueue") | "Queue someQueue" session.createTopic("someTopic") | "Topic someTopic" session.createTemporaryQueue() | "Temporary Queue" session.createTemporaryTopic() | "Temporary Topic" } + + def "failing to receive message with receiveNoWait on #jmsResourceName works"() { + setup: + def consumer = session.createConsumer(destination) + + // Receive with timeout + TextMessage receivedMessage = consumer.receiveNoWait() + + expect: + receivedMessage == null + assertTraces(TEST_WRITER, 1) { + trace(0, 1) { // Consumer trace + span(0) { + parent() + serviceName "jms" + operationName "jms.consume" + resourceName "JMS receiveNoWait: no message" + spanType DDSpanTypes.MESSAGE_PRODUCER + errored false + + tags { + defaultTags() + "${DDTags.SPAN_TYPE}" DDSpanTypes.MESSAGE_CONSUMER + "${Tags.COMPONENT.key}" "jms2" + "${Tags.SPAN_KIND.key}" "consumer" + "span.origin.type" HornetQMessageConsumer.name + } + } + } + } + + cleanup: + consumer.close() + + where: + destination | jmsResourceName + session.createQueue("someQueue") | "Queue someQueue" + session.createTopic("someTopic") | "Topic someTopic" + } + + def "failing to receive message with wait(timeout) on #jmsResourceName works"() { + setup: + def consumer = session.createConsumer(destination) + + // Receive with timeout + TextMessage receivedMessage = consumer.receive(100) + + expect: + receivedMessage == null + assertTraces(TEST_WRITER, 1) { + trace(0, 1) { // Consumer trace + span(0) { + parent() + serviceName "jms" + operationName "jms.consume" + resourceName "JMS receive: no message" + spanType DDSpanTypes.MESSAGE_PRODUCER + errored false + + tags { + defaultTags() + "${DDTags.SPAN_TYPE}" DDSpanTypes.MESSAGE_CONSUMER + "${Tags.COMPONENT.key}" "jms2" + "${Tags.SPAN_KIND.key}" "consumer" + "span.origin.type" HornetQMessageConsumer.name + } + } + } + } + + cleanup: + consumer.close() + + where: + destination | jmsResourceName + session.createQueue("someQueue") | "Queue someQueue" + session.createTopic("someTopic") | "Topic someTopic" + } + + def producerTrace(ListWriterAssert writer, int index, String jmsResourceName) { + writer.trace(index, 1) { + span(0) { + parent() + serviceName "jms" + operationName "jms.produce" + resourceName "Produced for $jmsResourceName" + spanType DDSpanTypes.MESSAGE_PRODUCER + errored false + + tags { + defaultTags() + "${DDTags.SPAN_TYPE}" DDSpanTypes.MESSAGE_PRODUCER + "${Tags.COMPONENT.key}" "jms2" + "${Tags.SPAN_KIND.key}" "producer" + "span.origin.type" HornetQMessageProducer.name + } + } + } + } + + def consumerTrace(ListWriterAssert writer, int index, String jmsResourceName, origin) { + writer.trace(index, 1) { + span(0) { + childOf TEST_WRITER.firstTrace().get(2) + serviceName "jms" + operationName "jms.onMessage" + resourceName "Received from $jmsResourceName" + spanType DDSpanTypes.MESSAGE_PRODUCER + errored false + + tags { + defaultTags() + "${DDTags.SPAN_TYPE}" DDSpanTypes.MESSAGE_CONSUMER + "${Tags.COMPONENT.key}" "jms2" + "${Tags.SPAN_KIND.key}" "consumer" + "span.origin.type" origin + } + } + } + } }