diff --git a/dd-java-agent/instrumentation/jms/jms.gradle b/dd-java-agent/instrumentation/jms/jms.gradle index 51499df139..2b47a2f724 100644 --- a/dd-java-agent/instrumentation/jms/jms.gradle +++ b/dd-java-agent/instrumentation/jms/jms.gradle @@ -35,6 +35,8 @@ dependencies { testCompile group: 'org.apache.activemq', name: 'activemq-pool', version: '5.14.5' testCompile group: 'org.apache.activemq', name: 'activemq-broker', version: '5.14.5' + testCompile group: 'org.springframework', name: 'spring-jms', version: '4.3.21.RELEASE' // 4.x required for Java 7 + latestDepTestCompile group: 'org.hornetq', name: 'hornetq-jms-client', version: '2.4.7.Final' latestDepTestCompile group: 'org.hornetq', name: 'hornetq-jms-server', version: '2.4.7.Final' } diff --git a/dd-java-agent/instrumentation/jms/src/latestDepTest/groovy/JMS2Test.groovy b/dd-java-agent/instrumentation/jms/src/latestDepTest/groovy/JMS2Test.groovy index f5380fbd30..bbaa56188b 100644 --- a/dd-java-agent/instrumentation/jms/src/latestDepTest/groovy/JMS2Test.groovy +++ b/dd-java-agent/instrumentation/jms/src/latestDepTest/groovy/JMS2Test.groovy @@ -1,4 +1,5 @@ import com.google.common.io.Files +import datadog.opentracing.DDSpan import datadog.trace.agent.test.AgentTestRunner import datadog.trace.agent.test.asserts.ListWriterAssert import datadog.trace.api.DDSpanTypes @@ -14,6 +15,7 @@ import org.hornetq.core.config.impl.ConfigurationImpl import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory +import org.hornetq.core.server.HornetQServer import org.hornetq.core.server.HornetQServers import org.hornetq.jms.client.HornetQMessageConsumer import org.hornetq.jms.client.HornetQMessageProducer @@ -28,6 +30,8 @@ import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicReference class JMS2Test extends AgentTestRunner { + @Shared + HornetQServer server @Shared String messageText = "a message" @Shared @@ -50,7 +54,8 @@ class JMS2Test extends AgentTestRunner { config.setAcceptorConfigurations([new TransportConfiguration(NettyAcceptorFactory.name), new TransportConfiguration(InVMAcceptorFactory.name)].toSet()) - HornetQServers.newHornetQServer(config).start() + server = HornetQServers.newHornetQServer(config) + server.start() def serverLocator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(InVMConnectorFactory.name)) def sf = serverLocator.createSessionFactory() @@ -70,6 +75,10 @@ class JMS2Test extends AgentTestRunner { session.run() } + def cleanupSpec() { + server.stop() + } + def "sending a message to #jmsResourceName generates spans"() { setup: def producer = session.createProducer(destination) @@ -83,24 +92,7 @@ class JMS2Test extends AgentTestRunner { receivedMessage.text == messageText assertTraces(2) { producerTrace(it, 0, jmsResourceName) - trace(1, 1) { // Consumer trace - span(0) { - childOf TEST_WRITER.firstTrace().get(0) - serviceName "jms" - operationName "jms.consume" - resourceName "Consumed from $jmsResourceName" - spanType DDSpanTypes.MESSAGE_PRODUCER - errored false - - tags { - defaultTags(true) - "${DDTags.SPAN_TYPE}" DDSpanTypes.MESSAGE_CONSUMER - "${Tags.COMPONENT.key}" "jms" - "${Tags.SPAN_KIND.key}" "consumer" - "span.origin.type" HornetQMessageConsumer.name - } - } - } + consumerTrace(it, 1, jmsResourceName, false, HornetQMessageConsumer) } cleanup: @@ -135,24 +127,7 @@ class JMS2Test extends AgentTestRunner { expect: assertTraces(2) { producerTrace(it, 0, jmsResourceName) - trace(1, 1) { // Consumer trace - span(0) { - childOf TEST_WRITER.firstTrace().get(0) - serviceName "jms" - operationName "jms.onMessage" - resourceName "Received from $jmsResourceName" - spanType DDSpanTypes.MESSAGE_PRODUCER - errored false - - tags { - defaultTags(true) - "${DDTags.SPAN_TYPE}" DDSpanTypes.MESSAGE_CONSUMER - "${Tags.COMPONENT.key}" "jms" - "${Tags.SPAN_KIND.key}" "consumer" - "span.origin.type" { t -> t.contains("JMS2Test") } - } - } - } + consumerTrace(it, 1, jmsResourceName, true, consumer.messageListener.class) } // This check needs to go after all traces have been accounted for messageRef.get().text == messageText @@ -247,7 +222,7 @@ class JMS2Test extends AgentTestRunner { session.createTopic("someTopic") | "Topic someTopic" } - def producerTrace(ListWriterAssert writer, int index, String jmsResourceName) { + static producerTrace(ListWriterAssert writer, int index, String jmsResourceName) { writer.trace(index, 1) { span(0) { parent() @@ -268,22 +243,27 @@ class JMS2Test extends AgentTestRunner { } } - def consumerTrace(ListWriterAssert writer, int index, String jmsResourceName, origin) { + static consumerTrace(ListWriterAssert writer, int index, String jmsResourceName, boolean messageListener, Class origin, DDSpan parentSpan = TEST_WRITER[0][0]) { writer.trace(index, 1) { span(0) { - childOf TEST_WRITER.firstTrace().get(2) + childOf parentSpan serviceName "jms" - operationName "jms.onMessage" - resourceName "Received from $jmsResourceName" + if (messageListener) { + operationName "jms.onMessage" + resourceName "Received from $jmsResourceName" + } else { + operationName "jms.consume" + resourceName "Consumed from $jmsResourceName" + } spanType DDSpanTypes.MESSAGE_PRODUCER errored false tags { - defaultTags() + defaultTags(true) "${DDTags.SPAN_TYPE}" DDSpanTypes.MESSAGE_CONSUMER "${Tags.COMPONENT.key}" "jms" "${Tags.SPAN_KIND.key}" "consumer" - "span.origin.type" origin + "span.origin.type" origin.name } } } diff --git a/dd-java-agent/instrumentation/jms/src/latestDepTest/groovy/SpringTemplateJMS2Test.groovy b/dd-java-agent/instrumentation/jms/src/latestDepTest/groovy/SpringTemplateJMS2Test.groovy new file mode 100644 index 0000000000..d541b7c410 --- /dev/null +++ b/dd-java-agent/instrumentation/jms/src/latestDepTest/groovy/SpringTemplateJMS2Test.groovy @@ -0,0 +1,124 @@ +import com.google.common.io.Files +import datadog.trace.agent.test.AgentTestRunner +import org.hornetq.api.core.TransportConfiguration +import org.hornetq.api.core.client.HornetQClient +import org.hornetq.api.jms.HornetQJMSClient +import org.hornetq.api.jms.JMSFactoryType +import org.hornetq.core.config.Configuration +import org.hornetq.core.config.CoreQueueConfiguration +import org.hornetq.core.config.impl.ConfigurationImpl +import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory +import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory +import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory +import org.hornetq.core.server.HornetQServers +import org.hornetq.jms.client.HornetQMessageConsumer +import org.springframework.jms.core.JmsTemplate +import spock.lang.Shared + +import javax.jms.Session +import javax.jms.TextMessage +import java.util.concurrent.TimeUnit + +import static JMS2Test.consumerTrace +import static JMS2Test.producerTrace + +class SpringTemplateJMS2Test extends AgentTestRunner { + @Shared + String messageText = "a message" + @Shared + JmsTemplate template + @Shared + Session session + + def setupSpec() { + def tempDir = Files.createTempDir() + tempDir.deleteOnExit() + + Configuration config = new ConfigurationImpl() + config.bindingsDirectory = tempDir.path + config.journalDirectory = tempDir.path + config.createBindingsDir = false + config.createJournalDir = false + config.securityEnabled = false + config.persistenceEnabled = false + config.setQueueConfigurations([new CoreQueueConfiguration("someQueue", "someQueue", null, true)]) + config.setAcceptorConfigurations([new TransportConfiguration(NettyAcceptorFactory.name), + new TransportConfiguration(InVMAcceptorFactory.name)].toSet()) + + HornetQServers.newHornetQServer(config).start() + + def serverLocator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(InVMConnectorFactory.name)) + def sf = serverLocator.createSessionFactory() + def clientSession = sf.createSession(false, false, false) + clientSession.createQueue("jms.queue.someSpringQueue", "jms.queue.someSpringQueue", true) + clientSession.close() + sf.close() + serverLocator.close() + + def connectionFactory = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, + new TransportConfiguration(InVMConnectorFactory.name)) + + def connection = connectionFactory.createConnection() + connection.start() + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE) + session.run() + + template = new JmsTemplate(connectionFactory) + template.receiveTimeout = TimeUnit.SECONDS.toMillis(10) + } + + def "sending a message to #jmsResourceName generates spans"() { + setup: + template.convertAndSend(destination, messageText) + TextMessage receivedMessage = template.receive(destination) + + expect: + receivedMessage.text == messageText + assertTraces(2) { + producerTrace(it, 0, jmsResourceName) + consumerTrace(it, 1, jmsResourceName, false, HornetQMessageConsumer) + } + + where: + destination | jmsResourceName + session.createQueue("someSpringQueue") | "Queue someSpringQueue" + } + + def "send and receive message generates spans"() { + setup: + Thread.start { + TEST_WRITER.waitForTraces(1) + TextMessage msg = template.receive(destination) + assert msg.text == messageText + + // There's a chance this might be reported last, messing up the assertion. + template.send(msg.getJMSReplyTo()) { + session -> template.getMessageConverter().toMessage("responded!", session) + } + } + TextMessage receivedMessage = template.sendAndReceive(destination) { + session -> template.getMessageConverter().toMessage(messageText, session) + } + + TEST_WRITER.waitForTraces(4) + // Manually reorder if reported in the wrong order. + if (TEST_WRITER[3][0].operationName == "jms.produce") { + def producerTrace = TEST_WRITER[3] + TEST_WRITER[3] = TEST_WRITER[2] + TEST_WRITER[2] = producerTrace + } + + expect: + receivedMessage.text == "responded!" + assertTraces(4) { + producerTrace(it, 0, jmsResourceName) + consumerTrace(it, 1, jmsResourceName, false, HornetQMessageConsumer) + producerTrace(it, 2, "Temporary Queue") // receive doesn't propagate the trace, so this is a root + consumerTrace(it, 3, "Temporary Queue", false, HornetQMessageConsumer, TEST_WRITER[2][0]) + } + + where: + destination | jmsResourceName + session.createQueue("someSpringQueue") | "Queue someSpringQueue" + } +} diff --git a/dd-java-agent/instrumentation/jms/src/test/groovy/JMS1Test.groovy b/dd-java-agent/instrumentation/jms/src/test/groovy/JMS1Test.groovy index a0bbab0b57..1b47d5a1f6 100644 --- a/dd-java-agent/instrumentation/jms/src/test/groovy/JMS1Test.groovy +++ b/dd-java-agent/instrumentation/jms/src/test/groovy/JMS1Test.groovy @@ -1,3 +1,4 @@ +import datadog.opentracing.DDSpan import datadog.trace.agent.test.AgentTestRunner import datadog.trace.agent.test.asserts.ListWriterAssert import datadog.trace.api.DDSpanTypes @@ -19,6 +20,8 @@ import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicReference class JMS1Test extends AgentTestRunner { + @Shared + EmbeddedActiveMQBroker broker = new EmbeddedActiveMQBroker() @Shared String messageText = "a message" @Shared @@ -27,7 +30,6 @@ class JMS1Test extends AgentTestRunner { ActiveMQTextMessage message = session.createTextMessage(messageText) def setupSpec() { - EmbeddedActiveMQBroker broker = new EmbeddedActiveMQBroker() broker.start() final ActiveMQConnectionFactory connectionFactory = broker.createConnectionFactory() @@ -36,6 +38,10 @@ class JMS1Test extends AgentTestRunner { session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE) } + def cleanupSpec() { + broker.stop() + } + def "sending a message to #jmsResourceName generates spans"() { setup: def producer = session.createProducer(destination) @@ -49,24 +55,7 @@ class JMS1Test extends AgentTestRunner { receivedMessage.text == messageText assertTraces(2) { producerTrace(it, 0, jmsResourceName) - trace(1, 1) { // Consumer trace - span(0) { - childOf TEST_WRITER[0][0] - serviceName "jms" - operationName "jms.consume" - resourceName "Consumed from $jmsResourceName" - spanType DDSpanTypes.MESSAGE_PRODUCER - errored false - - tags { - defaultTags(true) - "${DDTags.SPAN_TYPE}" DDSpanTypes.MESSAGE_CONSUMER - "${Tags.COMPONENT.key}" "jms" - "${Tags.SPAN_KIND.key}" "consumer" - "span.origin.type" ActiveMQMessageConsumer.name - } - } - } + consumerTrace(it, 1, jmsResourceName, false, ActiveMQMessageConsumer) } cleanup: @@ -101,24 +90,7 @@ class JMS1Test extends AgentTestRunner { expect: assertTraces(2) { producerTrace(it, 0, jmsResourceName) - trace(1, 1) { // Consumer trace - span(0) { - childOf TEST_WRITER[0][0] - serviceName "jms" - operationName "jms.onMessage" - resourceName "Received from $jmsResourceName" - spanType DDSpanTypes.MESSAGE_PRODUCER - errored false - - tags { - defaultTags(true) - "${DDTags.SPAN_TYPE}" DDSpanTypes.MESSAGE_CONSUMER - "${Tags.COMPONENT.key}" "jms" - "${Tags.SPAN_KIND.key}" "consumer" - "span.origin.type" { t -> t.contains("JMS1Test") } - } - } - } + consumerTrace(it, 1, jmsResourceName, true, consumer.messageListener.class) } // This check needs to go after all traces have been accounted for messageRef.get().text == messageText @@ -268,15 +240,15 @@ class JMS1Test extends AgentTestRunner { session.createTemporaryTopic() | "Temporary Topic" } - def producerTrace(ListWriterAssert writer, int index, String jmsResourceName) { + static producerTrace(ListWriterAssert writer, int index, String jmsResourceName) { writer.trace(index, 1) { span(0) { - parent() serviceName "jms" operationName "jms.produce" resourceName "Produced for $jmsResourceName" spanType DDSpanTypes.MESSAGE_PRODUCER errored false + parent() tags { defaultTags() @@ -289,22 +261,27 @@ class JMS1Test extends AgentTestRunner { } } - def consumerTrace(ListWriterAssert writer, int index, String jmsResourceName, origin) { + static consumerTrace(ListWriterAssert writer, int index, String jmsResourceName, boolean messageListener, Class origin, DDSpan parentSpan = TEST_WRITER[0][0]) { writer.trace(index, 1) { span(0) { - childOf TEST_WRITER[0][0] serviceName "jms" - operationName "jms.onMessage" - resourceName "Received from $jmsResourceName" + if (messageListener) { + operationName "jms.onMessage" + resourceName "Received from $jmsResourceName" + } else { + operationName "jms.consume" + resourceName "Consumed from $jmsResourceName" + } spanType DDSpanTypes.MESSAGE_PRODUCER errored false + childOf parentSpan tags { - defaultTags() + defaultTags(true) "${DDTags.SPAN_TYPE}" DDSpanTypes.MESSAGE_CONSUMER "${Tags.COMPONENT.key}" "jms" "${Tags.SPAN_KIND.key}" "consumer" - "span.origin.type" origin + "span.origin.type" origin.name } } } diff --git a/dd-java-agent/instrumentation/jms/src/test/groovy/SpringTemplateJMS1Test.groovy b/dd-java-agent/instrumentation/jms/src/test/groovy/SpringTemplateJMS1Test.groovy new file mode 100644 index 0000000000..f285d4de3f --- /dev/null +++ b/dd-java-agent/instrumentation/jms/src/test/groovy/SpringTemplateJMS1Test.groovy @@ -0,0 +1,96 @@ +import datadog.trace.agent.test.AgentTestRunner +import org.apache.activemq.ActiveMQConnectionFactory +import org.apache.activemq.ActiveMQMessageConsumer +import org.apache.activemq.junit.EmbeddedActiveMQBroker +import org.springframework.jms.core.JmsTemplate +import spock.lang.Shared + +import javax.jms.Connection +import javax.jms.Session +import javax.jms.TextMessage +import java.util.concurrent.TimeUnit + +import static JMS1Test.consumerTrace +import static JMS1Test.producerTrace + +class SpringTemplateJMS1Test extends AgentTestRunner { + @Shared + EmbeddedActiveMQBroker broker = new EmbeddedActiveMQBroker() + @Shared + String messageText = "a message" + @Shared + JmsTemplate template + @Shared + Session session + + def setupSpec() { + broker.start() + final ActiveMQConnectionFactory connectionFactory = broker.createConnectionFactory() + + final Connection connection = connectionFactory.createConnection() + connection.start() + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE) + + template = new JmsTemplate(connectionFactory) + template.receiveTimeout = TimeUnit.SECONDS.toMillis(10) + } + + def cleanupSpec() { + broker.stop() + } + + def "sending a message to #jmsResourceName generates spans"() { + setup: + template.convertAndSend(destination, messageText) + TextMessage receivedMessage = template.receive(destination) + + expect: + receivedMessage.text == messageText + assertTraces(2) { + producerTrace(it, 0, jmsResourceName) + consumerTrace(it, 1, jmsResourceName, false, ActiveMQMessageConsumer) + } + + where: + destination | jmsResourceName + session.createQueue("someSpringQueue") | "Queue someSpringQueue" + } + + def "send and receive message generates spans"() { + setup: + Thread.start { + TEST_WRITER.waitForTraces(1) + TextMessage msg = template.receive(destination) + assert msg.text == messageText + + // There's a chance this might be reported last, messing up the assertion. + template.send(msg.getJMSReplyTo()) { + session -> template.getMessageConverter().toMessage("responded!", session) + } + } + TextMessage receivedMessage = template.sendAndReceive(destination) { + session -> template.getMessageConverter().toMessage(messageText, session) + } + + TEST_WRITER.waitForTraces(4) + // Manually reorder if reported in the wrong order. + if (TEST_WRITER[3][0].operationName == "jms.produce") { + def producerTrace = TEST_WRITER[3] + TEST_WRITER[3] = TEST_WRITER[2] + TEST_WRITER[2] = producerTrace + } + + expect: + receivedMessage.text == "responded!" + assertTraces(4) { + producerTrace(it, 0, jmsResourceName) + consumerTrace(it, 1, jmsResourceName, false, ActiveMQMessageConsumer) + producerTrace(it, 2, "Temporary Queue") // receive doesn't propagate the trace, so this is a root + consumerTrace(it, 3, "Temporary Queue", false, ActiveMQMessageConsumer, TEST_WRITER[2][0]) + } + + where: + destination | jmsResourceName + session.createQueue("someSpringQueue") | "Queue someSpringQueue" + } +}