Add tests for Spring JMS Template
This commit is contained in:
parent
74743be6e5
commit
013e57c677
|
@ -35,6 +35,8 @@ dependencies {
|
|||
testCompile group: 'org.apache.activemq', name: 'activemq-pool', version: '5.14.5'
|
||||
testCompile group: 'org.apache.activemq', name: 'activemq-broker', version: '5.14.5'
|
||||
|
||||
testCompile group: 'org.springframework', name: 'spring-jms', version: '4.3.21.RELEASE' // 4.x required for Java 7
|
||||
|
||||
latestDepTestCompile group: 'org.hornetq', name: 'hornetq-jms-client', version: '2.4.7.Final'
|
||||
latestDepTestCompile group: 'org.hornetq', name: 'hornetq-jms-server', version: '2.4.7.Final'
|
||||
}
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import com.google.common.io.Files
|
||||
import datadog.opentracing.DDSpan
|
||||
import datadog.trace.agent.test.AgentTestRunner
|
||||
import datadog.trace.agent.test.asserts.ListWriterAssert
|
||||
import datadog.trace.api.DDSpanTypes
|
||||
|
@ -14,6 +15,7 @@ import org.hornetq.core.config.impl.ConfigurationImpl
|
|||
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory
|
||||
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory
|
||||
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory
|
||||
import org.hornetq.core.server.HornetQServer
|
||||
import org.hornetq.core.server.HornetQServers
|
||||
import org.hornetq.jms.client.HornetQMessageConsumer
|
||||
import org.hornetq.jms.client.HornetQMessageProducer
|
||||
|
@ -28,6 +30,8 @@ import java.util.concurrent.CountDownLatch
|
|||
import java.util.concurrent.atomic.AtomicReference
|
||||
|
||||
class JMS2Test extends AgentTestRunner {
|
||||
@Shared
|
||||
HornetQServer server
|
||||
@Shared
|
||||
String messageText = "a message"
|
||||
@Shared
|
||||
|
@ -50,7 +54,8 @@ class JMS2Test extends AgentTestRunner {
|
|||
config.setAcceptorConfigurations([new TransportConfiguration(NettyAcceptorFactory.name),
|
||||
new TransportConfiguration(InVMAcceptorFactory.name)].toSet())
|
||||
|
||||
HornetQServers.newHornetQServer(config).start()
|
||||
server = HornetQServers.newHornetQServer(config)
|
||||
server.start()
|
||||
|
||||
def serverLocator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(InVMConnectorFactory.name))
|
||||
def sf = serverLocator.createSessionFactory()
|
||||
|
@ -70,6 +75,10 @@ class JMS2Test extends AgentTestRunner {
|
|||
session.run()
|
||||
}
|
||||
|
||||
def cleanupSpec() {
|
||||
server.stop()
|
||||
}
|
||||
|
||||
def "sending a message to #jmsResourceName generates spans"() {
|
||||
setup:
|
||||
def producer = session.createProducer(destination)
|
||||
|
@ -83,24 +92,7 @@ class JMS2Test extends AgentTestRunner {
|
|||
receivedMessage.text == messageText
|
||||
assertTraces(2) {
|
||||
producerTrace(it, 0, jmsResourceName)
|
||||
trace(1, 1) { // Consumer trace
|
||||
span(0) {
|
||||
childOf TEST_WRITER.firstTrace().get(0)
|
||||
serviceName "jms"
|
||||
operationName "jms.consume"
|
||||
resourceName "Consumed from $jmsResourceName"
|
||||
spanType DDSpanTypes.MESSAGE_PRODUCER
|
||||
errored false
|
||||
|
||||
tags {
|
||||
defaultTags(true)
|
||||
"${DDTags.SPAN_TYPE}" DDSpanTypes.MESSAGE_CONSUMER
|
||||
"${Tags.COMPONENT.key}" "jms"
|
||||
"${Tags.SPAN_KIND.key}" "consumer"
|
||||
"span.origin.type" HornetQMessageConsumer.name
|
||||
}
|
||||
}
|
||||
}
|
||||
consumerTrace(it, 1, jmsResourceName, false, HornetQMessageConsumer)
|
||||
}
|
||||
|
||||
cleanup:
|
||||
|
@ -135,24 +127,7 @@ class JMS2Test extends AgentTestRunner {
|
|||
expect:
|
||||
assertTraces(2) {
|
||||
producerTrace(it, 0, jmsResourceName)
|
||||
trace(1, 1) { // Consumer trace
|
||||
span(0) {
|
||||
childOf TEST_WRITER.firstTrace().get(0)
|
||||
serviceName "jms"
|
||||
operationName "jms.onMessage"
|
||||
resourceName "Received from $jmsResourceName"
|
||||
spanType DDSpanTypes.MESSAGE_PRODUCER
|
||||
errored false
|
||||
|
||||
tags {
|
||||
defaultTags(true)
|
||||
"${DDTags.SPAN_TYPE}" DDSpanTypes.MESSAGE_CONSUMER
|
||||
"${Tags.COMPONENT.key}" "jms"
|
||||
"${Tags.SPAN_KIND.key}" "consumer"
|
||||
"span.origin.type" { t -> t.contains("JMS2Test") }
|
||||
}
|
||||
}
|
||||
}
|
||||
consumerTrace(it, 1, jmsResourceName, true, consumer.messageListener.class)
|
||||
}
|
||||
// This check needs to go after all traces have been accounted for
|
||||
messageRef.get().text == messageText
|
||||
|
@ -247,7 +222,7 @@ class JMS2Test extends AgentTestRunner {
|
|||
session.createTopic("someTopic") | "Topic someTopic"
|
||||
}
|
||||
|
||||
def producerTrace(ListWriterAssert writer, int index, String jmsResourceName) {
|
||||
static producerTrace(ListWriterAssert writer, int index, String jmsResourceName) {
|
||||
writer.trace(index, 1) {
|
||||
span(0) {
|
||||
parent()
|
||||
|
@ -268,22 +243,27 @@ class JMS2Test extends AgentTestRunner {
|
|||
}
|
||||
}
|
||||
|
||||
def consumerTrace(ListWriterAssert writer, int index, String jmsResourceName, origin) {
|
||||
static consumerTrace(ListWriterAssert writer, int index, String jmsResourceName, boolean messageListener, Class origin, DDSpan parentSpan = TEST_WRITER[0][0]) {
|
||||
writer.trace(index, 1) {
|
||||
span(0) {
|
||||
childOf TEST_WRITER.firstTrace().get(2)
|
||||
childOf parentSpan
|
||||
serviceName "jms"
|
||||
operationName "jms.onMessage"
|
||||
resourceName "Received from $jmsResourceName"
|
||||
if (messageListener) {
|
||||
operationName "jms.onMessage"
|
||||
resourceName "Received from $jmsResourceName"
|
||||
} else {
|
||||
operationName "jms.consume"
|
||||
resourceName "Consumed from $jmsResourceName"
|
||||
}
|
||||
spanType DDSpanTypes.MESSAGE_PRODUCER
|
||||
errored false
|
||||
|
||||
tags {
|
||||
defaultTags()
|
||||
defaultTags(true)
|
||||
"${DDTags.SPAN_TYPE}" DDSpanTypes.MESSAGE_CONSUMER
|
||||
"${Tags.COMPONENT.key}" "jms"
|
||||
"${Tags.SPAN_KIND.key}" "consumer"
|
||||
"span.origin.type" origin
|
||||
"span.origin.type" origin.name
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,124 @@
|
|||
import com.google.common.io.Files
|
||||
import datadog.trace.agent.test.AgentTestRunner
|
||||
import org.hornetq.api.core.TransportConfiguration
|
||||
import org.hornetq.api.core.client.HornetQClient
|
||||
import org.hornetq.api.jms.HornetQJMSClient
|
||||
import org.hornetq.api.jms.JMSFactoryType
|
||||
import org.hornetq.core.config.Configuration
|
||||
import org.hornetq.core.config.CoreQueueConfiguration
|
||||
import org.hornetq.core.config.impl.ConfigurationImpl
|
||||
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory
|
||||
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory
|
||||
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory
|
||||
import org.hornetq.core.server.HornetQServers
|
||||
import org.hornetq.jms.client.HornetQMessageConsumer
|
||||
import org.springframework.jms.core.JmsTemplate
|
||||
import spock.lang.Shared
|
||||
|
||||
import javax.jms.Session
|
||||
import javax.jms.TextMessage
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import static JMS2Test.consumerTrace
|
||||
import static JMS2Test.producerTrace
|
||||
|
||||
class SpringTemplateJMS2Test extends AgentTestRunner {
|
||||
@Shared
|
||||
String messageText = "a message"
|
||||
@Shared
|
||||
JmsTemplate template
|
||||
@Shared
|
||||
Session session
|
||||
|
||||
def setupSpec() {
|
||||
def tempDir = Files.createTempDir()
|
||||
tempDir.deleteOnExit()
|
||||
|
||||
Configuration config = new ConfigurationImpl()
|
||||
config.bindingsDirectory = tempDir.path
|
||||
config.journalDirectory = tempDir.path
|
||||
config.createBindingsDir = false
|
||||
config.createJournalDir = false
|
||||
config.securityEnabled = false
|
||||
config.persistenceEnabled = false
|
||||
config.setQueueConfigurations([new CoreQueueConfiguration("someQueue", "someQueue", null, true)])
|
||||
config.setAcceptorConfigurations([new TransportConfiguration(NettyAcceptorFactory.name),
|
||||
new TransportConfiguration(InVMAcceptorFactory.name)].toSet())
|
||||
|
||||
HornetQServers.newHornetQServer(config).start()
|
||||
|
||||
def serverLocator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(InVMConnectorFactory.name))
|
||||
def sf = serverLocator.createSessionFactory()
|
||||
def clientSession = sf.createSession(false, false, false)
|
||||
clientSession.createQueue("jms.queue.someSpringQueue", "jms.queue.someSpringQueue", true)
|
||||
clientSession.close()
|
||||
sf.close()
|
||||
serverLocator.close()
|
||||
|
||||
def connectionFactory = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF,
|
||||
new TransportConfiguration(InVMConnectorFactory.name))
|
||||
|
||||
def connection = connectionFactory.createConnection()
|
||||
connection.start()
|
||||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
|
||||
session.run()
|
||||
|
||||
template = new JmsTemplate(connectionFactory)
|
||||
template.receiveTimeout = TimeUnit.SECONDS.toMillis(10)
|
||||
}
|
||||
|
||||
def "sending a message to #jmsResourceName generates spans"() {
|
||||
setup:
|
||||
template.convertAndSend(destination, messageText)
|
||||
TextMessage receivedMessage = template.receive(destination)
|
||||
|
||||
expect:
|
||||
receivedMessage.text == messageText
|
||||
assertTraces(2) {
|
||||
producerTrace(it, 0, jmsResourceName)
|
||||
consumerTrace(it, 1, jmsResourceName, false, HornetQMessageConsumer)
|
||||
}
|
||||
|
||||
where:
|
||||
destination | jmsResourceName
|
||||
session.createQueue("someSpringQueue") | "Queue someSpringQueue"
|
||||
}
|
||||
|
||||
def "send and receive message generates spans"() {
|
||||
setup:
|
||||
Thread.start {
|
||||
TEST_WRITER.waitForTraces(1)
|
||||
TextMessage msg = template.receive(destination)
|
||||
assert msg.text == messageText
|
||||
|
||||
// There's a chance this might be reported last, messing up the assertion.
|
||||
template.send(msg.getJMSReplyTo()) {
|
||||
session -> template.getMessageConverter().toMessage("responded!", session)
|
||||
}
|
||||
}
|
||||
TextMessage receivedMessage = template.sendAndReceive(destination) {
|
||||
session -> template.getMessageConverter().toMessage(messageText, session)
|
||||
}
|
||||
|
||||
TEST_WRITER.waitForTraces(4)
|
||||
// Manually reorder if reported in the wrong order.
|
||||
if (TEST_WRITER[3][0].operationName == "jms.produce") {
|
||||
def producerTrace = TEST_WRITER[3]
|
||||
TEST_WRITER[3] = TEST_WRITER[2]
|
||||
TEST_WRITER[2] = producerTrace
|
||||
}
|
||||
|
||||
expect:
|
||||
receivedMessage.text == "responded!"
|
||||
assertTraces(4) {
|
||||
producerTrace(it, 0, jmsResourceName)
|
||||
consumerTrace(it, 1, jmsResourceName, false, HornetQMessageConsumer)
|
||||
producerTrace(it, 2, "Temporary Queue") // receive doesn't propagate the trace, so this is a root
|
||||
consumerTrace(it, 3, "Temporary Queue", false, HornetQMessageConsumer, TEST_WRITER[2][0])
|
||||
}
|
||||
|
||||
where:
|
||||
destination | jmsResourceName
|
||||
session.createQueue("someSpringQueue") | "Queue someSpringQueue"
|
||||
}
|
||||
}
|
|
@ -1,3 +1,4 @@
|
|||
import datadog.opentracing.DDSpan
|
||||
import datadog.trace.agent.test.AgentTestRunner
|
||||
import datadog.trace.agent.test.asserts.ListWriterAssert
|
||||
import datadog.trace.api.DDSpanTypes
|
||||
|
@ -19,6 +20,8 @@ import java.util.concurrent.CountDownLatch
|
|||
import java.util.concurrent.atomic.AtomicReference
|
||||
|
||||
class JMS1Test extends AgentTestRunner {
|
||||
@Shared
|
||||
EmbeddedActiveMQBroker broker = new EmbeddedActiveMQBroker()
|
||||
@Shared
|
||||
String messageText = "a message"
|
||||
@Shared
|
||||
|
@ -27,7 +30,6 @@ class JMS1Test extends AgentTestRunner {
|
|||
ActiveMQTextMessage message = session.createTextMessage(messageText)
|
||||
|
||||
def setupSpec() {
|
||||
EmbeddedActiveMQBroker broker = new EmbeddedActiveMQBroker()
|
||||
broker.start()
|
||||
final ActiveMQConnectionFactory connectionFactory = broker.createConnectionFactory()
|
||||
|
||||
|
@ -36,6 +38,10 @@ class JMS1Test extends AgentTestRunner {
|
|||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
|
||||
}
|
||||
|
||||
def cleanupSpec() {
|
||||
broker.stop()
|
||||
}
|
||||
|
||||
def "sending a message to #jmsResourceName generates spans"() {
|
||||
setup:
|
||||
def producer = session.createProducer(destination)
|
||||
|
@ -49,24 +55,7 @@ class JMS1Test extends AgentTestRunner {
|
|||
receivedMessage.text == messageText
|
||||
assertTraces(2) {
|
||||
producerTrace(it, 0, jmsResourceName)
|
||||
trace(1, 1) { // Consumer trace
|
||||
span(0) {
|
||||
childOf TEST_WRITER[0][0]
|
||||
serviceName "jms"
|
||||
operationName "jms.consume"
|
||||
resourceName "Consumed from $jmsResourceName"
|
||||
spanType DDSpanTypes.MESSAGE_PRODUCER
|
||||
errored false
|
||||
|
||||
tags {
|
||||
defaultTags(true)
|
||||
"${DDTags.SPAN_TYPE}" DDSpanTypes.MESSAGE_CONSUMER
|
||||
"${Tags.COMPONENT.key}" "jms"
|
||||
"${Tags.SPAN_KIND.key}" "consumer"
|
||||
"span.origin.type" ActiveMQMessageConsumer.name
|
||||
}
|
||||
}
|
||||
}
|
||||
consumerTrace(it, 1, jmsResourceName, false, ActiveMQMessageConsumer)
|
||||
}
|
||||
|
||||
cleanup:
|
||||
|
@ -101,24 +90,7 @@ class JMS1Test extends AgentTestRunner {
|
|||
expect:
|
||||
assertTraces(2) {
|
||||
producerTrace(it, 0, jmsResourceName)
|
||||
trace(1, 1) { // Consumer trace
|
||||
span(0) {
|
||||
childOf TEST_WRITER[0][0]
|
||||
serviceName "jms"
|
||||
operationName "jms.onMessage"
|
||||
resourceName "Received from $jmsResourceName"
|
||||
spanType DDSpanTypes.MESSAGE_PRODUCER
|
||||
errored false
|
||||
|
||||
tags {
|
||||
defaultTags(true)
|
||||
"${DDTags.SPAN_TYPE}" DDSpanTypes.MESSAGE_CONSUMER
|
||||
"${Tags.COMPONENT.key}" "jms"
|
||||
"${Tags.SPAN_KIND.key}" "consumer"
|
||||
"span.origin.type" { t -> t.contains("JMS1Test") }
|
||||
}
|
||||
}
|
||||
}
|
||||
consumerTrace(it, 1, jmsResourceName, true, consumer.messageListener.class)
|
||||
}
|
||||
// This check needs to go after all traces have been accounted for
|
||||
messageRef.get().text == messageText
|
||||
|
@ -268,15 +240,15 @@ class JMS1Test extends AgentTestRunner {
|
|||
session.createTemporaryTopic() | "Temporary Topic"
|
||||
}
|
||||
|
||||
def producerTrace(ListWriterAssert writer, int index, String jmsResourceName) {
|
||||
static producerTrace(ListWriterAssert writer, int index, String jmsResourceName) {
|
||||
writer.trace(index, 1) {
|
||||
span(0) {
|
||||
parent()
|
||||
serviceName "jms"
|
||||
operationName "jms.produce"
|
||||
resourceName "Produced for $jmsResourceName"
|
||||
spanType DDSpanTypes.MESSAGE_PRODUCER
|
||||
errored false
|
||||
parent()
|
||||
|
||||
tags {
|
||||
defaultTags()
|
||||
|
@ -289,22 +261,27 @@ class JMS1Test extends AgentTestRunner {
|
|||
}
|
||||
}
|
||||
|
||||
def consumerTrace(ListWriterAssert writer, int index, String jmsResourceName, origin) {
|
||||
static consumerTrace(ListWriterAssert writer, int index, String jmsResourceName, boolean messageListener, Class origin, DDSpan parentSpan = TEST_WRITER[0][0]) {
|
||||
writer.trace(index, 1) {
|
||||
span(0) {
|
||||
childOf TEST_WRITER[0][0]
|
||||
serviceName "jms"
|
||||
operationName "jms.onMessage"
|
||||
resourceName "Received from $jmsResourceName"
|
||||
if (messageListener) {
|
||||
operationName "jms.onMessage"
|
||||
resourceName "Received from $jmsResourceName"
|
||||
} else {
|
||||
operationName "jms.consume"
|
||||
resourceName "Consumed from $jmsResourceName"
|
||||
}
|
||||
spanType DDSpanTypes.MESSAGE_PRODUCER
|
||||
errored false
|
||||
childOf parentSpan
|
||||
|
||||
tags {
|
||||
defaultTags()
|
||||
defaultTags(true)
|
||||
"${DDTags.SPAN_TYPE}" DDSpanTypes.MESSAGE_CONSUMER
|
||||
"${Tags.COMPONENT.key}" "jms"
|
||||
"${Tags.SPAN_KIND.key}" "consumer"
|
||||
"span.origin.type" origin
|
||||
"span.origin.type" origin.name
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,96 @@
|
|||
import datadog.trace.agent.test.AgentTestRunner
|
||||
import org.apache.activemq.ActiveMQConnectionFactory
|
||||
import org.apache.activemq.ActiveMQMessageConsumer
|
||||
import org.apache.activemq.junit.EmbeddedActiveMQBroker
|
||||
import org.springframework.jms.core.JmsTemplate
|
||||
import spock.lang.Shared
|
||||
|
||||
import javax.jms.Connection
|
||||
import javax.jms.Session
|
||||
import javax.jms.TextMessage
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import static JMS1Test.consumerTrace
|
||||
import static JMS1Test.producerTrace
|
||||
|
||||
class SpringTemplateJMS1Test extends AgentTestRunner {
|
||||
@Shared
|
||||
EmbeddedActiveMQBroker broker = new EmbeddedActiveMQBroker()
|
||||
@Shared
|
||||
String messageText = "a message"
|
||||
@Shared
|
||||
JmsTemplate template
|
||||
@Shared
|
||||
Session session
|
||||
|
||||
def setupSpec() {
|
||||
broker.start()
|
||||
final ActiveMQConnectionFactory connectionFactory = broker.createConnectionFactory()
|
||||
|
||||
final Connection connection = connectionFactory.createConnection()
|
||||
connection.start()
|
||||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
|
||||
|
||||
template = new JmsTemplate(connectionFactory)
|
||||
template.receiveTimeout = TimeUnit.SECONDS.toMillis(10)
|
||||
}
|
||||
|
||||
def cleanupSpec() {
|
||||
broker.stop()
|
||||
}
|
||||
|
||||
def "sending a message to #jmsResourceName generates spans"() {
|
||||
setup:
|
||||
template.convertAndSend(destination, messageText)
|
||||
TextMessage receivedMessage = template.receive(destination)
|
||||
|
||||
expect:
|
||||
receivedMessage.text == messageText
|
||||
assertTraces(2) {
|
||||
producerTrace(it, 0, jmsResourceName)
|
||||
consumerTrace(it, 1, jmsResourceName, false, ActiveMQMessageConsumer)
|
||||
}
|
||||
|
||||
where:
|
||||
destination | jmsResourceName
|
||||
session.createQueue("someSpringQueue") | "Queue someSpringQueue"
|
||||
}
|
||||
|
||||
def "send and receive message generates spans"() {
|
||||
setup:
|
||||
Thread.start {
|
||||
TEST_WRITER.waitForTraces(1)
|
||||
TextMessage msg = template.receive(destination)
|
||||
assert msg.text == messageText
|
||||
|
||||
// There's a chance this might be reported last, messing up the assertion.
|
||||
template.send(msg.getJMSReplyTo()) {
|
||||
session -> template.getMessageConverter().toMessage("responded!", session)
|
||||
}
|
||||
}
|
||||
TextMessage receivedMessage = template.sendAndReceive(destination) {
|
||||
session -> template.getMessageConverter().toMessage(messageText, session)
|
||||
}
|
||||
|
||||
TEST_WRITER.waitForTraces(4)
|
||||
// Manually reorder if reported in the wrong order.
|
||||
if (TEST_WRITER[3][0].operationName == "jms.produce") {
|
||||
def producerTrace = TEST_WRITER[3]
|
||||
TEST_WRITER[3] = TEST_WRITER[2]
|
||||
TEST_WRITER[2] = producerTrace
|
||||
}
|
||||
|
||||
expect:
|
||||
receivedMessage.text == "responded!"
|
||||
assertTraces(4) {
|
||||
producerTrace(it, 0, jmsResourceName)
|
||||
consumerTrace(it, 1, jmsResourceName, false, ActiveMQMessageConsumer)
|
||||
producerTrace(it, 2, "Temporary Queue") // receive doesn't propagate the trace, so this is a root
|
||||
consumerTrace(it, 3, "Temporary Queue", false, ActiveMQMessageConsumer, TEST_WRITER[2][0])
|
||||
}
|
||||
|
||||
where:
|
||||
destination | jmsResourceName
|
||||
session.createQueue("someSpringQueue") | "Queue someSpringQueue"
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue