Convert JMS Groovy tests to Java (#9476)

This commit is contained in:
LMarkie 2023-09-18 13:39:34 +01:00 committed by GitHub
parent 27a14e12a8
commit 331aa04e35
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 729 additions and 699 deletions

View File

@ -1,317 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.instrumentation.test.asserts.TraceAssert
import io.opentelemetry.sdk.trace.data.SpanData
import io.opentelemetry.semconv.SemanticAttributes
import org.hornetq.api.core.TransportConfiguration
import org.hornetq.api.core.client.HornetQClient
import org.hornetq.api.jms.HornetQJMSClient
import org.hornetq.api.jms.JMSFactoryType
import org.hornetq.core.config.Configuration
import org.hornetq.core.config.CoreQueueConfiguration
import org.hornetq.core.config.impl.ConfigurationImpl
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory
import org.hornetq.core.server.HornetQServer
import org.hornetq.core.server.HornetQServers
import org.hornetq.jms.client.HornetQTextMessage
import spock.lang.Shared
import javax.jms.Message
import javax.jms.MessageListener
import javax.jms.Session
import javax.jms.TextMessage
import java.nio.file.Files
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicReference
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
class Jms2Test extends AgentInstrumentationSpecification {
@Shared
HornetQServer server
@Shared
String messageText = "a message"
@Shared
Session session
HornetQTextMessage message = session.createTextMessage(messageText)
def setupSpec() {
def tempDir = Files.createTempDirectory("jmsTempDir").toFile()
tempDir.deleteOnExit()
Configuration config = new ConfigurationImpl()
config.bindingsDirectory = tempDir.path
config.journalDirectory = tempDir.path
config.createBindingsDir = false
config.createJournalDir = false
config.securityEnabled = false
config.persistenceEnabled = false
config.setQueueConfigurations([new CoreQueueConfiguration("someQueue", "someQueue", null, true)])
config.setAcceptorConfigurations([new TransportConfiguration(InVMAcceptorFactory.name)].toSet())
server = HornetQServers.newHornetQServer(config)
server.start()
def serverLocator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(InVMConnectorFactory.name))
def sf = serverLocator.createSessionFactory()
def clientSession = sf.createSession(false, false, false)
clientSession.createQueue("jms.queue.someQueue", "jms.queue.someQueue", true)
clientSession.createQueue("jms.topic.someTopic", "jms.topic.someTopic", true)
clientSession.close()
sf.close()
serverLocator.close()
def connectionFactory = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF,
new TransportConfiguration(InVMConnectorFactory.name))
def connection = connectionFactory.createConnection()
connection.start()
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
session.run()
}
def cleanupSpec() {
server.stop()
}
def "sending a message to #destinationName generates spans"() {
setup:
def producer = session.createProducer(destination)
def consumer = session.createConsumer(destination)
runWithSpan("producer parent") {
producer.send(message)
}
TextMessage receivedMessage = runWithSpan("consumer parent") {
return consumer.receive() as TextMessage
}
String messageId = receivedMessage.getJMSMessageID()
expect:
receivedMessage.text == messageText
assertTraces(2) {
SpanData producerSpanData
trace(0, 2) {
span(0) {
name "producer parent"
hasNoParent()
}
producerSpan(it, 1, destinationName, span(0))
producerSpanData = span(1)
}
trace(1, 2) {
span(0) {
name "consumer parent"
hasNoParent()
}
consumerSpan(it, 1, destinationName, messageId, "receive", span(0), producerSpanData)
}
}
cleanup:
producer.close()
consumer.close()
where:
destination | destinationName
session.createQueue("someQueue") | "someQueue"
session.createTopic("someTopic") | "someTopic"
session.createTemporaryQueue() | "(temporary)"
session.createTemporaryTopic() | "(temporary)"
}
def "sending to a MessageListener on #destinationName generates a span"() {
setup:
def lock = new CountDownLatch(1)
def messageRef = new AtomicReference<TextMessage>()
def producer = session.createProducer(destination)
def consumer = session.createConsumer(destination)
consumer.setMessageListener new MessageListener() {
@Override
void onMessage(Message message) {
lock.await() // ensure the producer trace is reported first.
messageRef.set(message as TextMessage)
}
}
runWithSpan("parent") {
producer.send(message)
}
lock.countDown()
expect:
assertTraces(1) {
trace(0, 3) {
span(0) {
name "parent"
hasNoParent()
}
producerSpan(it, 1, destinationName, span(0))
consumerSpan(it, 2, destinationName, messageRef.get().getJMSMessageID(), "process", span(1))
}
}
// This check needs to go after all traces have been accounted for
messageRef.get().text == messageText
cleanup:
producer.close()
consumer.close()
where:
destination | destinationName
session.createQueue("someQueue") | "someQueue"
session.createTopic("someTopic") | "someTopic"
session.createTemporaryQueue() | "(temporary)"
session.createTemporaryTopic() | "(temporary)"
}
def "failing to receive message with receiveNoWait on #destinationName works"() {
setup:
def consumer = session.createConsumer(destination)
// Receive with timeout
Message receivedMessage = consumer.receiveNoWait()
expect:
receivedMessage == null
// span is not created if no message is received
assertTraces(0) {}
cleanup:
consumer.close()
where:
destination | destinationName
session.createQueue("someQueue") | "someQueue"
session.createTopic("someTopic") | "someTopic"
}
def "failing to receive message with wait(timeout) on #destinationName works"() {
setup:
def consumer = session.createConsumer(destination)
// Receive with timeout
Message receivedMessage = consumer.receive(100)
expect:
receivedMessage == null
// span is not created if no message is received
assertTraces(0) {}
cleanup:
consumer.close()
where:
destination | destinationName
session.createQueue("someQueue") | "someQueue"
session.createTopic("someTopic") | "someTopic"
}
def "sending a message to #destinationName with explicit destination propagates context"() {
given:
def producer = session.createProducer(null)
def consumer = session.createConsumer(destination)
def lock = new CountDownLatch(1)
def messageRef = new AtomicReference<TextMessage>()
consumer.setMessageListener new MessageListener() {
@Override
void onMessage(Message message) {
lock.await() // ensure the producer trace is reported first.
messageRef.set(message as TextMessage)
}
}
when:
runWithSpan("parent") {
producer.send(destination, message)
}
lock.countDown()
then:
assertTraces(1) {
trace(0, 3) {
span(0) {
name "parent"
hasNoParent()
}
producerSpan(it, 1, destinationName, span(0))
consumerSpan(it, 2, destinationName, messageRef.get().getJMSMessageID(), "process", span(1))
}
}
// This check needs to go after all traces have been accounted for
messageRef.get().text == messageText
cleanup:
producer.close()
consumer.close()
where:
destination | destinationName
session.createQueue("someQueue") | "someQueue"
session.createTopic("someTopic") | "someTopic"
session.createTemporaryQueue() | "(temporary)"
session.createTemporaryTopic() | "(temporary)"
}
static producerSpan(TraceAssert trace, int index, String destinationName, SpanData parentSpan = null) {
trace.span(index) {
name destinationName + " publish"
kind PRODUCER
if (parentSpan == null) {
hasNoParent()
} else {
childOf(parentSpan)
}
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "jms"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" destinationName
if (destinationName == "(temporary)") {
"$SemanticAttributes.MESSAGING_DESTINATION_TEMPORARY" true
}
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
}
}
}
// passing messageId = null will verify message.id is not captured,
// passing messageId = "" will verify message.id is captured (but won't verify anything about the value),
// any other value for messageId will verify that message.id is captured and has that same value
static consumerSpan(TraceAssert trace, int index, String destinationName, String messageId, String operation, SpanData parentSpan, SpanData linkedSpan = null) {
trace.span(index) {
name destinationName + " " + operation
kind CONSUMER
if (parentSpan == null) {
hasNoParent()
} else {
childOf(parentSpan)
}
if (linkedSpan == null) {
hasNoLinks()
} else {
hasLink(linkedSpan)
}
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "jms"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" destinationName
"$SemanticAttributes.MESSAGING_OPERATION" operation
if (messageId != null) {
//In some tests we don't know exact messageId, so we pass "" and verify just the existence of the attribute
"$SemanticAttributes.MESSAGING_MESSAGE_ID" { it == messageId || messageId == "" }
}
if (destinationName == "(temporary)") {
"$SemanticAttributes.MESSAGING_DESTINATION_TEMPORARY" true
}
}
}
}
}

View File

@ -0,0 +1,320 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.jms.v1_1;
import static io.opentelemetry.api.trace.SpanKind.CONSUMER;
import static io.opentelemetry.api.trace.SpanKind.PRODUCER;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.params.provider.Arguments.arguments;
import io.opentelemetry.instrumentation.testing.internal.AutoCleanupExtension;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.sdk.testing.assertj.AttributeAssertion;
import io.opentelemetry.sdk.trace.data.LinkData;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.semconv.SemanticAttributes;
import java.io.File;
import java.nio.file.Files;
import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.assertj.core.api.AbstractAssert;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.api.jms.JMSFactoryType;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.CoreQueueConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;
public class Jms2InstrumentationTest {
@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
@RegisterExtension static final AutoCleanupExtension cleanup = AutoCleanupExtension.create();
static HornetQServer server;
static HornetQConnectionFactory connectionFactory;
static Session session;
static Connection connection;
@BeforeAll
static void setUp() throws Exception {
File tempDir = Files.createTempDirectory("jmsTempDir").toFile();
tempDir.deleteOnExit();
Configuration config = new ConfigurationImpl();
config.setBindingsDirectory(tempDir.getPath());
config.setJournalDirectory(tempDir.getPath());
config.setCreateBindingsDir(false);
config.setCreateJournalDir(false);
config.setSecurityEnabled(false);
config.setPersistenceEnabled(false);
config.setQueueConfigurations(
Collections.singletonList(
new CoreQueueConfiguration("someQueue", "someQueue", null, true)));
config.setAcceptorConfigurations(
new HashSet<>(
Collections.singletonList(
new TransportConfiguration(InVMAcceptorFactory.class.getName()))));
server = HornetQServers.newHornetQServer(config);
server.start();
ServerLocator serverLocator =
HornetQClient.createServerLocatorWithoutHA(
new TransportConfiguration(InVMConnectorFactory.class.getName()));
ClientSessionFactory sf = serverLocator.createSessionFactory();
ClientSession clientSession = sf.createSession(false, false, false);
clientSession.createQueue("jms.queue.someQueue", "jms.queue.someQueue", true);
clientSession.createQueue("jms.topic.someTopic", "jms.topic.someTopic", true);
clientSession.close();
sf.close();
serverLocator.close();
connectionFactory =
HornetQJMSClient.createConnectionFactoryWithoutHA(
JMSFactoryType.CF, new TransportConfiguration(InVMConnectorFactory.class.getName()));
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.run();
}
@AfterAll
static void tearDown() throws Exception {
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
if (connectionFactory != null) {
connectionFactory.close();
}
if (server != null) {
server.stop();
}
}
@ArgumentsSource(DestinationsProvider.class)
@ParameterizedTest
void testMessageConsumer(
DestinationFactory destinationFactory, String destinationName, boolean isTemporary)
throws JMSException {
// given
Destination destination = destinationFactory.create(session);
TextMessage sentMessage = session.createTextMessage("a message");
MessageProducer producer = session.createProducer(destination);
cleanup.deferCleanup(producer);
MessageConsumer consumer = session.createConsumer(destination);
cleanup.deferCleanup(consumer);
// when
testing.runWithSpan("producer parent", () -> producer.send(sentMessage));
TextMessage receivedMessage =
testing.runWithSpan("consumer parent", () -> (TextMessage) consumer.receive());
// then
assertThat(receivedMessage.getText()).isEqualTo(sentMessage.getText());
String messageId = receivedMessage.getJMSMessageID();
AtomicReference<SpanData> producerSpan = new AtomicReference<>();
testing.waitAndAssertTraces(
trace -> {
trace.hasSpansSatisfyingExactly(
span -> span.hasName("producer parent").hasNoParent(),
span ->
span.hasName(destinationName + " publish")
.hasKind(PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName),
equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId),
messagingTempDestination(isTemporary)));
producerSpan.set(trace.getSpan(1));
},
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("consumer parent").hasNoParent(),
span ->
span.hasName(destinationName + " receive")
.hasKind(CONSUMER)
.hasParent(trace.getSpan(0))
.hasLinks(LinkData.create(producerSpan.get().getSpanContext()))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"),
equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId),
messagingTempDestination(isTemporary))));
}
@ArgumentsSource(DestinationsProvider.class)
@ParameterizedTest
void testMessageListener(
DestinationFactory destinationFactory, String destinationName, boolean isTemporary)
throws Exception {
// given
Destination destination = destinationFactory.create(session);
TextMessage sentMessage = session.createTextMessage("a message");
MessageProducer producer = session.createProducer(null);
cleanup.deferCleanup(producer);
MessageConsumer consumer = session.createConsumer(destination);
cleanup.deferCleanup(consumer);
CompletableFuture<TextMessage> receivedMessageFuture = new CompletableFuture<>();
consumer.setMessageListener(
message ->
testing.runWithSpan(
"consumer", () -> receivedMessageFuture.complete((TextMessage) message)));
// when
testing.runWithSpan("producer parent", () -> producer.send(destination, sentMessage));
// then
TextMessage receivedMessage = receivedMessageFuture.get(10, TimeUnit.SECONDS);
assertThat(receivedMessage.getText()).isEqualTo(sentMessage.getText());
String messageId = receivedMessage.getJMSMessageID();
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("producer parent").hasNoParent(),
span ->
span.hasName(destinationName + " publish")
.hasKind(PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName),
equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId),
messagingTempDestination(isTemporary)),
span ->
span.hasName(destinationName + " process")
.hasKind(CONSUMER)
.hasParent(trace.getSpan(1))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId),
messagingTempDestination(isTemporary)),
span -> span.hasName("consumer").hasParent(trace.getSpan(2))));
}
@ArgumentsSource(EmptyReceiveArgumentsProvider.class)
@ParameterizedTest
void shouldNotEmitTelemetryOnEmptyReceive(
DestinationFactory destinationFactory, MessageReceiver receiver) throws JMSException {
// given
Destination destination = destinationFactory.create(session);
MessageConsumer consumer = session.createConsumer(destination);
cleanup.deferCleanup(consumer);
// when
Message message = receiver.receive(consumer);
// then
assertThat(message).isNull();
testing.waitForTraces(0);
}
private static AttributeAssertion messagingTempDestination(boolean isTemporary) {
return isTemporary
? equalTo(SemanticAttributes.MESSAGING_DESTINATION_TEMPORARY, true)
: satisfies(SemanticAttributes.MESSAGING_DESTINATION_TEMPORARY, AbstractAssert::isNull);
}
static final class EmptyReceiveArgumentsProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
DestinationFactory topic = session -> session.createTopic("someTopic");
DestinationFactory queue = session -> session.createQueue("someQueue");
MessageReceiver receive = consumer -> consumer.receive(100);
MessageReceiver receiveNoWait = MessageConsumer::receiveNoWait;
return Stream.of(
arguments(topic, receive),
arguments(queue, receive),
arguments(topic, receiveNoWait),
arguments(queue, receiveNoWait));
}
}
static final class DestinationsProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
DestinationFactory topic = session -> session.createTopic("someTopic");
DestinationFactory queue = session -> session.createQueue("someQueue");
DestinationFactory tempTopic = Session::createTemporaryTopic;
DestinationFactory tempQueue = Session::createTemporaryQueue;
return Stream.of(
arguments(topic, "someTopic", false),
arguments(queue, "someQueue", false),
arguments(tempTopic, "(temporary)", true),
arguments(tempQueue, "(temporary)", true));
}
}
@FunctionalInterface
interface DestinationFactory {
Destination create(Session session) throws JMSException;
}
@FunctionalInterface
interface MessageReceiver {
Message receive(MessageConsumer consumer) throws JMSException;
}
}

View File

@ -1,382 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.instrumentation.test.asserts.TraceAssert
import io.opentelemetry.sdk.trace.data.SpanData
import io.opentelemetry.semconv.SemanticAttributes
import org.apache.activemq.ActiveMQConnectionFactory
import org.apache.activemq.command.ActiveMQTextMessage
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.testcontainers.containers.GenericContainer
import org.testcontainers.containers.output.Slf4jLogConsumer
import spock.lang.Shared
import spock.lang.Unroll
import javax.jms.Connection
import javax.jms.Message
import javax.jms.MessageListener
import javax.jms.Session
import javax.jms.TextMessage
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicReference
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
@Unroll
class Jms1Test extends AgentInstrumentationSpecification {
private static final Logger logger = LoggerFactory.getLogger(Jms1Test)
private static final GenericContainer broker = new GenericContainer("rmohr/activemq:latest")
.withExposedPorts(61616, 8161)
.withLogConsumer(new Slf4jLogConsumer(logger))
@Shared
String messageText = "a message"
@Shared
Session session
ActiveMQTextMessage message = session.createTextMessage(messageText)
def setupSpec() {
broker.start()
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:" + broker.getMappedPort(61616))
Connection connection = connectionFactory.createConnection()
connection.start()
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
}
def cleanupSpec() {
broker.stop()
}
def "sending a message to #destinationName generates spans"() {
setup:
def producer = session.createProducer(destination)
def consumer = session.createConsumer(destination)
runWithSpan("producer parent") {
producer.send(message)
}
TextMessage receivedMessage = runWithSpan("consumer parent") {
return consumer.receive() as TextMessage
}
String messageId = receivedMessage.getJMSMessageID()
expect:
receivedMessage.text == messageText
assertTraces(2) {
SpanData producerSpanData
trace(0, 2) {
span(0) {
name "producer parent"
hasNoParent()
}
producerSpan(it, 1, destinationName, span(0))
producerSpanData = span(1)
}
trace(1, 2) {
span(0) {
name "consumer parent"
hasNoParent()
}
consumerSpan(it, 1, destinationName, messageId, "receive", span(0), producerSpanData)
}
}
cleanup:
producer.close()
consumer.close()
where:
destination | destinationName
session.createQueue("someQueue") | "someQueue"
session.createTopic("someTopic") | "someTopic"
session.createTemporaryQueue() | "(temporary)"
session.createTemporaryTopic() | "(temporary)"
}
def "sending to a MessageListener on #destinationName generates a span"() {
setup:
def lock = new CountDownLatch(1)
def messageRef = new AtomicReference<TextMessage>()
def producer = session.createProducer(destination)
def consumer = session.createConsumer(destination)
consumer.setMessageListener new MessageListener() {
@Override
void onMessage(Message message) {
lock.await() // ensure the producer trace is reported first.
messageRef.set(message as TextMessage)
}
}
producer.send(message)
lock.countDown()
expect:
assertTraces(1) {
trace(0, 2) {
producerSpan(it, 0, destinationName)
consumerSpan(it, 1, destinationName, messageRef.get().getJMSMessageID(), "process", span(0))
}
}
// This check needs to go after all traces have been accounted for
messageRef.get().text == messageText
cleanup:
producer.close()
consumer.close()
where:
destination | destinationName
session.createQueue("someQueue") | "someQueue"
session.createTopic("someTopic") | "someTopic"
session.createTemporaryQueue() | "(temporary)"
session.createTemporaryTopic() | "(temporary)"
}
def "failing to receive message with receiveNoWait on #destinationName works"() {
setup:
def consumer = session.createConsumer(destination)
// Receive with timeout
Message receivedMessage = consumer.receiveNoWait()
expect:
receivedMessage == null
// span is not created if no message is received
assertTraces(0) {}
cleanup:
consumer.close()
where:
destination | destinationName
session.createQueue("someQueue") | "someQueue"
session.createTopic("someTopic") | "someTopic"
}
def "failing to receive message with wait(timeout) on #destinationName works"() {
setup:
def consumer = session.createConsumer(destination)
// Receive with timeout
Message receivedMessage = consumer.receive(100)
expect:
receivedMessage == null
// span is not created if no message is received
assertTraces(0) {}
cleanup:
consumer.close()
where:
destination | destinationName
session.createQueue("someQueue") | "someQueue"
session.createTopic("someTopic") | "someTopic"
}
def "sending a read-only message to #destinationName fails"() {
setup:
def producer = session.createProducer(destination)
def consumer = session.createConsumer(destination)
expect:
!message.isReadOnlyProperties()
when:
message.setReadOnlyProperties(true)
and:
producer.send(message)
TextMessage receivedMessage = consumer.receive() as TextMessage
then:
receivedMessage.text == messageText
// This will result in a logged failure because we tried to
// write properties in MessagePropertyTextMap when readOnlyProperties = true.
// The consumer span will also not be linked to the parent.
assertTraces(2) {
trace(0, 1) {
producerSpan(it, 0, destinationName)
}
trace(1, 1) {
consumerSpan(it, 0, destinationName, "", "receive", null)
}
}
cleanup:
producer.close()
consumer.close()
where:
destination | destinationName
session.createQueue("someQueue") | "someQueue"
session.createTopic("someTopic") | "someTopic"
session.createTemporaryQueue() | "(temporary)"
session.createTemporaryTopic() | "(temporary)"
}
def "sending a message to #destinationName with explicit destination propagates context"() {
given:
def producer = session.createProducer(null)
def consumer = session.createConsumer(destination)
def lock = new CountDownLatch(1)
def messageRef = new AtomicReference<TextMessage>()
consumer.setMessageListener new MessageListener() {
@Override
void onMessage(Message message) {
lock.await() // ensure the producer trace is reported first.
messageRef.set(message as TextMessage)
}
}
when:
runWithSpan("parent") {
producer.send(destination, message)
}
lock.countDown()
then:
assertTraces(1) {
trace(0, 3) {
span(0) {
name "parent"
hasNoParent()
}
producerSpan(it, 1, destinationName, span(0))
consumerSpan(it, 2, destinationName, messageRef.get().getJMSMessageID(), "process", span(1))
}
}
// This check needs to go after all traces have been accounted for
messageRef.get().text == messageText
cleanup:
producer.close()
consumer.close()
where:
destination | destinationName
session.createQueue("someQueue") | "someQueue"
session.createTopic("someTopic") | "someTopic"
session.createTemporaryQueue() | "(temporary)"
session.createTemporaryTopic() | "(temporary)"
}
def "capture message header as span attribute"() {
setup:
def destinationName = "someQueue"
def destination = session.createQueue(destinationName)
def producer = session.createProducer(destination)
def consumer = session.createConsumer(destination)
def message = session.createTextMessage(messageText)
message.setStringProperty("test-message-header", "test")
message.setIntProperty("test-message-int-header", 1234)
runWithSpan("producer parent") {
producer.send(message)
}
TextMessage receivedMessage = runWithSpan("consumer parent") {
return consumer.receive() as TextMessage
}
String messageId = receivedMessage.getJMSMessageID()
expect:
receivedMessage.text == messageText
assertTraces(2) {
SpanData producerSpanData
trace(0, 2) {
span(0) {
name "producer parent"
hasNoParent()
}
producerSpan(it, 1, destinationName, span(0), true)
producerSpanData = span(1)
}
trace(1, 2) {
span(0) {
name "consumer parent"
hasNoParent()
}
consumerSpan(it, 1, destinationName, messageId, "receive", span(0), producerSpanData, true)
}
}
cleanup:
producer.close()
consumer.close()
}
static producerSpan(TraceAssert trace, int index, String destinationName, SpanData parentSpan = null, boolean testHeaders = false) {
trace.span(index) {
name destinationName + " publish"
kind PRODUCER
if (parentSpan == null) {
hasNoParent()
} else {
childOf(parentSpan)
}
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "jms"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" destinationName
if (destinationName == "(temporary)") {
"$SemanticAttributes.MESSAGING_DESTINATION_TEMPORARY" true
}
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
if (testHeaders) {
"messaging.header.test_message_header" { it == ["test"] }
"messaging.header.test_message_int_header" { it == ["1234"] }
}
}
}
}
// passing messageId = null will verify message.id is not captured,
// passing messageId = "" will verify message.id is captured (but won't verify anything about the value),
// any other value for messageId will verify that message.id is captured and has that same value
static consumerSpan(TraceAssert trace, int index, String destinationName, String messageId, String operation, SpanData parentSpan, SpanData linkedSpan = null, boolean testHeaders = false) {
trace.span(index) {
name destinationName + " " + operation
kind CONSUMER
if (parentSpan == null) {
hasNoParent()
} else {
childOf(parentSpan)
}
if (linkedSpan == null) {
hasNoLinks()
} else {
hasLink(linkedSpan)
}
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "jms"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" destinationName
"$SemanticAttributes.MESSAGING_OPERATION" operation
if (messageId != null) {
//In some tests we don't know exact messageId, so we pass "" and verify just the existence of the attribute
"$SemanticAttributes.MESSAGING_MESSAGE_ID" { it == messageId || messageId == "" }
}
if (destinationName == "(temporary)") {
"$SemanticAttributes.MESSAGING_DESTINATION_TEMPORARY" true
}
if (testHeaders) {
"messaging.header.test_message_header" { it == ["test"] }
"messaging.header.test_message_int_header" { it == ["1234"] }
}
}
}
}
}

View File

@ -0,0 +1,409 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.jms.v1_1;
import static io.opentelemetry.api.common.AttributeKey.stringArrayKey;
import static io.opentelemetry.api.trace.SpanKind.CONSUMER;
import static io.opentelemetry.api.trace.SpanKind.PRODUCER;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.params.provider.Arguments.arguments;
import io.opentelemetry.instrumentation.testing.internal.AutoCleanupExtension;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.sdk.testing.assertj.AttributeAssertion;
import io.opentelemetry.sdk.trace.data.LinkData;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.semconv.SemanticAttributes;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.assertj.core.api.AbstractAssert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
public class Jms1InstrumentationTest {
static final Logger logger = LoggerFactory.getLogger(Jms1InstrumentationTest.class);
@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
@RegisterExtension static final AutoCleanupExtension cleanup = AutoCleanupExtension.create();
static GenericContainer<?> broker;
static ActiveMQConnectionFactory connectionFactory;
static Connection connection;
static Session session;
@BeforeAll
static void setUp() throws JMSException {
broker =
new GenericContainer<>("rmohr/activemq:latest")
.withExposedPorts(61616, 8161)
.withLogConsumer(new Slf4jLogConsumer(logger));
broker.start();
connectionFactory =
new ActiveMQConnectionFactory("tcp://localhost:" + broker.getMappedPort(61616));
Connection connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
@AfterAll
static void tearDown() throws JMSException {
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
if (broker != null) {
broker.close();
}
}
@ArgumentsSource(DestinationsProvider.class)
@ParameterizedTest
void testMessageConsumer(
DestinationFactory destinationFactory, String destinationName, boolean isTemporary)
throws JMSException {
// given
Destination destination = destinationFactory.create(session);
TextMessage sentMessage = session.createTextMessage("a message");
MessageProducer producer = session.createProducer(destination);
cleanup.deferCleanup(producer::close);
MessageConsumer consumer = session.createConsumer(destination);
cleanup.deferCleanup(consumer::close);
// when
testing.runWithSpan("producer parent", () -> producer.send(sentMessage));
TextMessage receivedMessage =
testing.runWithSpan("consumer parent", () -> (TextMessage) consumer.receive());
// then
assertThat(receivedMessage.getText()).isEqualTo(sentMessage.getText());
String messageId = receivedMessage.getJMSMessageID();
AtomicReference<SpanData> producerSpan = new AtomicReference<>();
testing.waitAndAssertTraces(
trace -> {
trace.hasSpansSatisfyingExactly(
span -> span.hasName("producer parent").hasNoParent(),
span ->
span.hasName(destinationName + " publish")
.hasKind(PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName),
equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId),
messagingTempDestination(isTemporary)));
producerSpan.set(trace.getSpan(1));
},
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("consumer parent").hasNoParent(),
span ->
span.hasName(destinationName + " receive")
.hasKind(CONSUMER)
.hasParent(trace.getSpan(0))
.hasLinks(LinkData.create(producerSpan.get().getSpanContext()))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"),
equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId),
messagingTempDestination(isTemporary))));
}
@ArgumentsSource(DestinationsProvider.class)
@ParameterizedTest
void testMessageListener(
DestinationFactory destinationFactory, String destinationName, boolean isTemporary)
throws Exception {
// given
Destination destination = destinationFactory.create(session);
TextMessage sentMessage = session.createTextMessage("a message");
MessageProducer producer = session.createProducer(null);
cleanup.deferCleanup(producer::close);
MessageConsumer consumer = session.createConsumer(destination);
cleanup.deferCleanup(consumer::close);
CompletableFuture<TextMessage> receivedMessageFuture = new CompletableFuture<>();
consumer.setMessageListener(
message ->
testing.runWithSpan(
"consumer", () -> receivedMessageFuture.complete((TextMessage) message)));
// when
testing.runWithSpan("producer parent", () -> producer.send(destination, sentMessage));
// then
TextMessage receivedMessage = receivedMessageFuture.get(10, TimeUnit.SECONDS);
assertThat(receivedMessage.getText()).isEqualTo(sentMessage.getText());
String messageId = receivedMessage.getJMSMessageID();
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("producer parent").hasNoParent(),
span ->
span.hasName(destinationName + " publish")
.hasKind(PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName),
equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId),
messagingTempDestination(isTemporary)),
span ->
span.hasName(destinationName + " process")
.hasKind(CONSUMER)
.hasParent(trace.getSpan(1))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId),
messagingTempDestination(isTemporary)),
span -> span.hasName("consumer").hasParent(trace.getSpan(2))));
}
@ArgumentsSource(EmptyReceiveArgumentsProvider.class)
@ParameterizedTest
void shouldNotEmitTelemetryOnEmptyReceive(
DestinationFactory destinationFactory, MessageReceiver receiver) throws JMSException {
// given
Destination destination = destinationFactory.create(session);
MessageConsumer consumer = session.createConsumer(destination);
cleanup.deferCleanup(consumer::close);
// when
Message message = receiver.receive(consumer);
// then
assertThat(message).isNull();
testing.waitForTraces(0);
}
@ArgumentsSource(DestinationsProvider.class)
@ParameterizedTest
void shouldCaptureMessageHeaders(
DestinationFactory destinationFactory, String destinationName, boolean isTemporary)
throws Exception {
// given
Destination destination = destinationFactory.create(session);
TextMessage sentMessage = session.createTextMessage("a message");
sentMessage.setStringProperty("test_message_header", "test");
sentMessage.setIntProperty("test_message_int_header", 1234);
MessageProducer producer = session.createProducer(destination);
cleanup.deferCleanup(producer::close);
MessageConsumer consumer = session.createConsumer(destination);
cleanup.deferCleanup(consumer::close);
CompletableFuture<TextMessage> receivedMessageFuture = new CompletableFuture<>();
consumer.setMessageListener(
message ->
testing.runWithSpan(
"consumer", () -> receivedMessageFuture.complete((TextMessage) message)));
// when
testing.runWithSpan("producer parent", () -> producer.send(sentMessage));
// then
TextMessage receivedMessage = receivedMessageFuture.get(10, TimeUnit.SECONDS);
assertThat(receivedMessage.getText()).isEqualTo(sentMessage.getText());
String messageId = receivedMessage.getJMSMessageID();
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("producer parent").hasNoParent(),
span ->
span.hasName(destinationName + " publish")
.hasKind(PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName),
equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId),
messagingTempDestination(isTemporary),
equalTo(
stringArrayKey("messaging.header.test_message_header"),
singletonList("test")),
equalTo(
stringArrayKey("messaging.header.test_message_int_header"),
singletonList("1234"))),
span ->
span.hasName(destinationName + " process")
.hasKind(CONSUMER)
.hasParent(trace.getSpan(1))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId),
messagingTempDestination(isTemporary),
equalTo(
stringArrayKey("messaging.header.test_message_header"),
singletonList("test")),
equalTo(
stringArrayKey("messaging.header.test_message_int_header"),
singletonList("1234"))),
span -> span.hasName("consumer").hasParent(trace.getSpan(2))));
}
@ArgumentsSource(DestinationsProvider.class)
@ParameterizedTest
void shouldFailWhenSendingReadOnlyMessage(
DestinationFactory destinationFactory, String destinationName, boolean isTemporary)
throws Exception {
// given
Destination destination = destinationFactory.create(session);
ActiveMQTextMessage sentMessage = (ActiveMQTextMessage) session.createTextMessage("a message");
MessageProducer producer = session.createProducer(destination);
cleanup.deferCleanup(producer::close);
MessageConsumer consumer = session.createConsumer(destination);
cleanup.deferCleanup(consumer::close);
sentMessage.setReadOnlyProperties(true);
// when
testing.runWithSpan("producer parent", () -> producer.send(sentMessage));
TextMessage receivedMessage = (TextMessage) consumer.receive();
// then
assertThat(receivedMessage.getText()).isEqualTo(sentMessage.getText());
String messageId = receivedMessage.getJMSMessageID();
// This will result in a logged failure because we tried to
// write properties in MessagePropertyTextMap when readOnlyProperties = true.
// As a result, the consumer span will not be linked to the producer span as we are unable to
// propagate the trace context as a message property.
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("producer parent").hasNoParent(),
span ->
span.hasName(destinationName + " publish")
.hasKind(PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName),
equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId),
messagingTempDestination(isTemporary))),
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName(destinationName + " receive")
.hasKind(CONSUMER)
.hasNoParent()
.hasTotalRecordedLinks(0)
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"),
equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId),
messagingTempDestination(isTemporary))));
}
private static AttributeAssertion messagingTempDestination(boolean isTemporary) {
return isTemporary
? equalTo(SemanticAttributes.MESSAGING_DESTINATION_TEMPORARY, true)
: satisfies(SemanticAttributes.MESSAGING_DESTINATION_TEMPORARY, AbstractAssert::isNull);
}
static final class EmptyReceiveArgumentsProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
DestinationFactory topic = session -> session.createTopic("someTopic");
DestinationFactory queue = session -> session.createQueue("someQueue");
MessageReceiver receive = consumer -> consumer.receive(100);
MessageReceiver receiveNoWait = MessageConsumer::receiveNoWait;
return Stream.of(
arguments(topic, receive),
arguments(queue, receive),
arguments(topic, receiveNoWait),
arguments(queue, receiveNoWait));
}
}
static final class DestinationsProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
DestinationFactory topic = session -> session.createTopic("someTopic");
DestinationFactory queue = session -> session.createQueue("someQueue");
DestinationFactory tempTopic = Session::createTemporaryTopic;
DestinationFactory tempQueue = Session::createTemporaryQueue;
return Stream.of(
arguments(topic, "someTopic", false),
arguments(queue, "someQueue", false),
arguments(tempTopic, "(temporary)", true),
arguments(tempQueue, "(temporary)", true));
}
}
@FunctionalInterface
interface DestinationFactory {
Destination create(Session session) throws JMSException;
}
@FunctionalInterface
interface MessageReceiver {
Message receive(MessageConsumer consumer) throws JMSException;
}
}