From 331aa04e351b46c57181fd28d0a3e6aabd15d880 Mon Sep 17 00:00:00 2001 From: LMarkie Date: Mon, 18 Sep 2023 13:39:34 +0100 Subject: [PATCH] Convert JMS Groovy tests to Java (#9476) --- .../src/jms2Test/groovy/Jms2Test.groovy | 317 -------------- .../jms/v1_1/Jms2InstrumentationTest.java | 320 ++++++++++++++ .../javaagent/src/test/groovy/Jms1Test.groovy | 382 ---------------- .../jms/v1_1/Jms1InstrumentationTest.java | 409 ++++++++++++++++++ 4 files changed, 729 insertions(+), 699 deletions(-) delete mode 100644 instrumentation/jms/jms-1.1/javaagent/src/jms2Test/groovy/Jms2Test.groovy create mode 100644 instrumentation/jms/jms-1.1/javaagent/src/jms2Test/java/io/opentelemetry/javaagent/instrumentation/jms/v1_1/Jms2InstrumentationTest.java delete mode 100644 instrumentation/jms/jms-1.1/javaagent/src/test/groovy/Jms1Test.groovy create mode 100644 instrumentation/jms/jms-1.1/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/jms/v1_1/Jms1InstrumentationTest.java diff --git a/instrumentation/jms/jms-1.1/javaagent/src/jms2Test/groovy/Jms2Test.groovy b/instrumentation/jms/jms-1.1/javaagent/src/jms2Test/groovy/Jms2Test.groovy deleted file mode 100644 index c36d674e3e..0000000000 --- a/instrumentation/jms/jms-1.1/javaagent/src/jms2Test/groovy/Jms2Test.groovy +++ /dev/null @@ -1,317 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification -import io.opentelemetry.instrumentation.test.asserts.TraceAssert -import io.opentelemetry.sdk.trace.data.SpanData -import io.opentelemetry.semconv.SemanticAttributes -import org.hornetq.api.core.TransportConfiguration -import org.hornetq.api.core.client.HornetQClient -import org.hornetq.api.jms.HornetQJMSClient -import org.hornetq.api.jms.JMSFactoryType -import org.hornetq.core.config.Configuration -import org.hornetq.core.config.CoreQueueConfiguration -import org.hornetq.core.config.impl.ConfigurationImpl -import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory -import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory -import org.hornetq.core.server.HornetQServer -import org.hornetq.core.server.HornetQServers -import org.hornetq.jms.client.HornetQTextMessage -import spock.lang.Shared - -import javax.jms.Message -import javax.jms.MessageListener -import javax.jms.Session -import javax.jms.TextMessage -import java.nio.file.Files -import java.util.concurrent.CountDownLatch -import java.util.concurrent.atomic.AtomicReference - -import static io.opentelemetry.api.trace.SpanKind.CONSUMER -import static io.opentelemetry.api.trace.SpanKind.PRODUCER - -class Jms2Test extends AgentInstrumentationSpecification { - @Shared - HornetQServer server - @Shared - String messageText = "a message" - @Shared - Session session - - HornetQTextMessage message = session.createTextMessage(messageText) - - def setupSpec() { - def tempDir = Files.createTempDirectory("jmsTempDir").toFile() - tempDir.deleteOnExit() - - Configuration config = new ConfigurationImpl() - config.bindingsDirectory = tempDir.path - config.journalDirectory = tempDir.path - config.createBindingsDir = false - config.createJournalDir = false - config.securityEnabled = false - config.persistenceEnabled = false - config.setQueueConfigurations([new CoreQueueConfiguration("someQueue", "someQueue", null, true)]) - config.setAcceptorConfigurations([new TransportConfiguration(InVMAcceptorFactory.name)].toSet()) - - server = HornetQServers.newHornetQServer(config) - server.start() - - def serverLocator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(InVMConnectorFactory.name)) - def sf = serverLocator.createSessionFactory() - def clientSession = sf.createSession(false, false, false) - clientSession.createQueue("jms.queue.someQueue", "jms.queue.someQueue", true) - clientSession.createQueue("jms.topic.someTopic", "jms.topic.someTopic", true) - clientSession.close() - sf.close() - serverLocator.close() - - def connectionFactory = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, - new TransportConfiguration(InVMConnectorFactory.name)) - - def connection = connectionFactory.createConnection() - connection.start() - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE) - session.run() - } - - def cleanupSpec() { - server.stop() - } - - def "sending a message to #destinationName generates spans"() { - setup: - def producer = session.createProducer(destination) - def consumer = session.createConsumer(destination) - - runWithSpan("producer parent") { - producer.send(message) - } - - TextMessage receivedMessage = runWithSpan("consumer parent") { - return consumer.receive() as TextMessage - } - String messageId = receivedMessage.getJMSMessageID() - - expect: - receivedMessage.text == messageText - assertTraces(2) { - SpanData producerSpanData - trace(0, 2) { - span(0) { - name "producer parent" - hasNoParent() - } - producerSpan(it, 1, destinationName, span(0)) - - producerSpanData = span(1) - } - trace(1, 2) { - span(0) { - name "consumer parent" - hasNoParent() - } - consumerSpan(it, 1, destinationName, messageId, "receive", span(0), producerSpanData) - } - } - - cleanup: - producer.close() - consumer.close() - - where: - destination | destinationName - session.createQueue("someQueue") | "someQueue" - session.createTopic("someTopic") | "someTopic" - session.createTemporaryQueue() | "(temporary)" - session.createTemporaryTopic() | "(temporary)" - } - - def "sending to a MessageListener on #destinationName generates a span"() { - setup: - def lock = new CountDownLatch(1) - def messageRef = new AtomicReference() - def producer = session.createProducer(destination) - def consumer = session.createConsumer(destination) - consumer.setMessageListener new MessageListener() { - @Override - void onMessage(Message message) { - lock.await() // ensure the producer trace is reported first. - messageRef.set(message as TextMessage) - } - } - - runWithSpan("parent") { - producer.send(message) - } - lock.countDown() - - expect: - assertTraces(1) { - trace(0, 3) { - span(0) { - name "parent" - hasNoParent() - } - producerSpan(it, 1, destinationName, span(0)) - consumerSpan(it, 2, destinationName, messageRef.get().getJMSMessageID(), "process", span(1)) - } - } - // This check needs to go after all traces have been accounted for - messageRef.get().text == messageText - - cleanup: - producer.close() - consumer.close() - - where: - destination | destinationName - session.createQueue("someQueue") | "someQueue" - session.createTopic("someTopic") | "someTopic" - session.createTemporaryQueue() | "(temporary)" - session.createTemporaryTopic() | "(temporary)" - } - - def "failing to receive message with receiveNoWait on #destinationName works"() { - setup: - def consumer = session.createConsumer(destination) - - // Receive with timeout - Message receivedMessage = consumer.receiveNoWait() - - expect: - receivedMessage == null - // span is not created if no message is received - assertTraces(0) {} - - cleanup: - consumer.close() - - where: - destination | destinationName - session.createQueue("someQueue") | "someQueue" - session.createTopic("someTopic") | "someTopic" - } - - def "failing to receive message with wait(timeout) on #destinationName works"() { - setup: - def consumer = session.createConsumer(destination) - - // Receive with timeout - Message receivedMessage = consumer.receive(100) - - expect: - receivedMessage == null - // span is not created if no message is received - assertTraces(0) {} - - cleanup: - consumer.close() - - where: - destination | destinationName - session.createQueue("someQueue") | "someQueue" - session.createTopic("someTopic") | "someTopic" - } - - def "sending a message to #destinationName with explicit destination propagates context"() { - given: - def producer = session.createProducer(null) - def consumer = session.createConsumer(destination) - - def lock = new CountDownLatch(1) - def messageRef = new AtomicReference() - consumer.setMessageListener new MessageListener() { - @Override - void onMessage(Message message) { - lock.await() // ensure the producer trace is reported first. - messageRef.set(message as TextMessage) - } - } - - when: - runWithSpan("parent") { - producer.send(destination, message) - } - lock.countDown() - - then: - assertTraces(1) { - trace(0, 3) { - span(0) { - name "parent" - hasNoParent() - } - producerSpan(it, 1, destinationName, span(0)) - consumerSpan(it, 2, destinationName, messageRef.get().getJMSMessageID(), "process", span(1)) - } - } - // This check needs to go after all traces have been accounted for - messageRef.get().text == messageText - - cleanup: - producer.close() - consumer.close() - - where: - destination | destinationName - session.createQueue("someQueue") | "someQueue" - session.createTopic("someTopic") | "someTopic" - session.createTemporaryQueue() | "(temporary)" - session.createTemporaryTopic() | "(temporary)" - } - - static producerSpan(TraceAssert trace, int index, String destinationName, SpanData parentSpan = null) { - trace.span(index) { - name destinationName + " publish" - kind PRODUCER - if (parentSpan == null) { - hasNoParent() - } else { - childOf(parentSpan) - } - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "jms" - "$SemanticAttributes.MESSAGING_DESTINATION_NAME" destinationName - if (destinationName == "(temporary)") { - "$SemanticAttributes.MESSAGING_DESTINATION_TEMPORARY" true - } - "$SemanticAttributes.MESSAGING_MESSAGE_ID" String - } - } - } - - // passing messageId = null will verify message.id is not captured, - // passing messageId = "" will verify message.id is captured (but won't verify anything about the value), - // any other value for messageId will verify that message.id is captured and has that same value - static consumerSpan(TraceAssert trace, int index, String destinationName, String messageId, String operation, SpanData parentSpan, SpanData linkedSpan = null) { - trace.span(index) { - name destinationName + " " + operation - kind CONSUMER - if (parentSpan == null) { - hasNoParent() - } else { - childOf(parentSpan) - } - if (linkedSpan == null) { - hasNoLinks() - } else { - hasLink(linkedSpan) - } - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "jms" - "$SemanticAttributes.MESSAGING_DESTINATION_NAME" destinationName - "$SemanticAttributes.MESSAGING_OPERATION" operation - if (messageId != null) { - //In some tests we don't know exact messageId, so we pass "" and verify just the existence of the attribute - "$SemanticAttributes.MESSAGING_MESSAGE_ID" { it == messageId || messageId == "" } - } - if (destinationName == "(temporary)") { - "$SemanticAttributes.MESSAGING_DESTINATION_TEMPORARY" true - } - } - } - } -} diff --git a/instrumentation/jms/jms-1.1/javaagent/src/jms2Test/java/io/opentelemetry/javaagent/instrumentation/jms/v1_1/Jms2InstrumentationTest.java b/instrumentation/jms/jms-1.1/javaagent/src/jms2Test/java/io/opentelemetry/javaagent/instrumentation/jms/v1_1/Jms2InstrumentationTest.java new file mode 100644 index 0000000000..bf7e6d1bf0 --- /dev/null +++ b/instrumentation/jms/jms-1.1/javaagent/src/jms2Test/java/io/opentelemetry/javaagent/instrumentation/jms/v1_1/Jms2InstrumentationTest.java @@ -0,0 +1,320 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.jms.v1_1; + +import static io.opentelemetry.api.trace.SpanKind.CONSUMER; +import static io.opentelemetry.api.trace.SpanKind.PRODUCER; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.params.provider.Arguments.arguments; + +import io.opentelemetry.instrumentation.testing.internal.AutoCleanupExtension; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.sdk.testing.assertj.AttributeAssertion; +import io.opentelemetry.sdk.trace.data.LinkData; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.semconv.SemanticAttributes; +import java.io.File; +import java.nio.file.Files; +import java.util.Collections; +import java.util.HashSet; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import org.assertj.core.api.AbstractAssert; +import org.hornetq.api.core.TransportConfiguration; +import org.hornetq.api.core.client.ClientSession; +import org.hornetq.api.core.client.ClientSessionFactory; +import org.hornetq.api.core.client.HornetQClient; +import org.hornetq.api.core.client.ServerLocator; +import org.hornetq.api.jms.HornetQJMSClient; +import org.hornetq.api.jms.JMSFactoryType; +import org.hornetq.core.config.Configuration; +import org.hornetq.core.config.CoreQueueConfiguration; +import org.hornetq.core.config.impl.ConfigurationImpl; +import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory; +import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory; +import org.hornetq.core.server.HornetQServer; +import org.hornetq.core.server.HornetQServers; +import org.hornetq.jms.client.HornetQConnectionFactory; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ArgumentsProvider; +import org.junit.jupiter.params.provider.ArgumentsSource; + +public class Jms2InstrumentationTest { + + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @RegisterExtension static final AutoCleanupExtension cleanup = AutoCleanupExtension.create(); + + static HornetQServer server; + static HornetQConnectionFactory connectionFactory; + static Session session; + static Connection connection; + + @BeforeAll + static void setUp() throws Exception { + File tempDir = Files.createTempDirectory("jmsTempDir").toFile(); + tempDir.deleteOnExit(); + + Configuration config = new ConfigurationImpl(); + config.setBindingsDirectory(tempDir.getPath()); + config.setJournalDirectory(tempDir.getPath()); + config.setCreateBindingsDir(false); + config.setCreateJournalDir(false); + config.setSecurityEnabled(false); + config.setPersistenceEnabled(false); + config.setQueueConfigurations( + Collections.singletonList( + new CoreQueueConfiguration("someQueue", "someQueue", null, true))); + config.setAcceptorConfigurations( + new HashSet<>( + Collections.singletonList( + new TransportConfiguration(InVMAcceptorFactory.class.getName())))); + + server = HornetQServers.newHornetQServer(config); + server.start(); + + ServerLocator serverLocator = + HornetQClient.createServerLocatorWithoutHA( + new TransportConfiguration(InVMConnectorFactory.class.getName())); + ClientSessionFactory sf = serverLocator.createSessionFactory(); + ClientSession clientSession = sf.createSession(false, false, false); + clientSession.createQueue("jms.queue.someQueue", "jms.queue.someQueue", true); + clientSession.createQueue("jms.topic.someTopic", "jms.topic.someTopic", true); + clientSession.close(); + sf.close(); + serverLocator.close(); + + connectionFactory = + HornetQJMSClient.createConnectionFactoryWithoutHA( + JMSFactoryType.CF, new TransportConfiguration(InVMConnectorFactory.class.getName())); + connection = connectionFactory.createConnection(); + connection.start(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.run(); + } + + @AfterAll + static void tearDown() throws Exception { + if (session != null) { + session.close(); + } + if (connection != null) { + connection.close(); + } + if (connectionFactory != null) { + connectionFactory.close(); + } + if (server != null) { + server.stop(); + } + } + + @ArgumentsSource(DestinationsProvider.class) + @ParameterizedTest + void testMessageConsumer( + DestinationFactory destinationFactory, String destinationName, boolean isTemporary) + throws JMSException { + + // given + Destination destination = destinationFactory.create(session); + TextMessage sentMessage = session.createTextMessage("a message"); + + MessageProducer producer = session.createProducer(destination); + cleanup.deferCleanup(producer); + MessageConsumer consumer = session.createConsumer(destination); + cleanup.deferCleanup(consumer); + + // when + testing.runWithSpan("producer parent", () -> producer.send(sentMessage)); + + TextMessage receivedMessage = + testing.runWithSpan("consumer parent", () -> (TextMessage) consumer.receive()); + + // then + assertThat(receivedMessage.getText()).isEqualTo(sentMessage.getText()); + + String messageId = receivedMessage.getJMSMessageID(); + + AtomicReference producerSpan = new AtomicReference<>(); + testing.waitAndAssertTraces( + trace -> { + trace.hasSpansSatisfyingExactly( + span -> span.hasName("producer parent").hasNoParent(), + span -> + span.hasName(destinationName + " publish") + .hasKind(PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName), + equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId), + messagingTempDestination(isTemporary))); + + producerSpan.set(trace.getSpan(1)); + }, + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("consumer parent").hasNoParent(), + span -> + span.hasName(destinationName + " receive") + .hasKind(CONSUMER) + .hasParent(trace.getSpan(0)) + .hasLinks(LinkData.create(producerSpan.get().getSpanContext())) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"), + equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId), + messagingTempDestination(isTemporary)))); + } + + @ArgumentsSource(DestinationsProvider.class) + @ParameterizedTest + void testMessageListener( + DestinationFactory destinationFactory, String destinationName, boolean isTemporary) + throws Exception { + + // given + Destination destination = destinationFactory.create(session); + TextMessage sentMessage = session.createTextMessage("a message"); + + MessageProducer producer = session.createProducer(null); + cleanup.deferCleanup(producer); + MessageConsumer consumer = session.createConsumer(destination); + cleanup.deferCleanup(consumer); + + CompletableFuture receivedMessageFuture = new CompletableFuture<>(); + consumer.setMessageListener( + message -> + testing.runWithSpan( + "consumer", () -> receivedMessageFuture.complete((TextMessage) message))); + + // when + testing.runWithSpan("producer parent", () -> producer.send(destination, sentMessage)); + + // then + TextMessage receivedMessage = receivedMessageFuture.get(10, TimeUnit.SECONDS); + assertThat(receivedMessage.getText()).isEqualTo(sentMessage.getText()); + + String messageId = receivedMessage.getJMSMessageID(); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("producer parent").hasNoParent(), + span -> + span.hasName(destinationName + " publish") + .hasKind(PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName), + equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId), + messagingTempDestination(isTemporary)), + span -> + span.hasName(destinationName + " process") + .hasKind(CONSUMER) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), + equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId), + messagingTempDestination(isTemporary)), + span -> span.hasName("consumer").hasParent(trace.getSpan(2)))); + } + + @ArgumentsSource(EmptyReceiveArgumentsProvider.class) + @ParameterizedTest + void shouldNotEmitTelemetryOnEmptyReceive( + DestinationFactory destinationFactory, MessageReceiver receiver) throws JMSException { + + // given + Destination destination = destinationFactory.create(session); + + MessageConsumer consumer = session.createConsumer(destination); + cleanup.deferCleanup(consumer); + + // when + Message message = receiver.receive(consumer); + + // then + assertThat(message).isNull(); + + testing.waitForTraces(0); + } + + private static AttributeAssertion messagingTempDestination(boolean isTemporary) { + return isTemporary + ? equalTo(SemanticAttributes.MESSAGING_DESTINATION_TEMPORARY, true) + : satisfies(SemanticAttributes.MESSAGING_DESTINATION_TEMPORARY, AbstractAssert::isNull); + } + + static final class EmptyReceiveArgumentsProvider implements ArgumentsProvider { + + @Override + public Stream provideArguments(ExtensionContext context) { + DestinationFactory topic = session -> session.createTopic("someTopic"); + DestinationFactory queue = session -> session.createQueue("someQueue"); + MessageReceiver receive = consumer -> consumer.receive(100); + MessageReceiver receiveNoWait = MessageConsumer::receiveNoWait; + + return Stream.of( + arguments(topic, receive), + arguments(queue, receive), + arguments(topic, receiveNoWait), + arguments(queue, receiveNoWait)); + } + } + + static final class DestinationsProvider implements ArgumentsProvider { + + @Override + public Stream provideArguments(ExtensionContext context) { + DestinationFactory topic = session -> session.createTopic("someTopic"); + DestinationFactory queue = session -> session.createQueue("someQueue"); + DestinationFactory tempTopic = Session::createTemporaryTopic; + DestinationFactory tempQueue = Session::createTemporaryQueue; + + return Stream.of( + arguments(topic, "someTopic", false), + arguments(queue, "someQueue", false), + arguments(tempTopic, "(temporary)", true), + arguments(tempQueue, "(temporary)", true)); + } + } + + @FunctionalInterface + interface DestinationFactory { + + Destination create(Session session) throws JMSException; + } + + @FunctionalInterface + interface MessageReceiver { + + Message receive(MessageConsumer consumer) throws JMSException; + } +} diff --git a/instrumentation/jms/jms-1.1/javaagent/src/test/groovy/Jms1Test.groovy b/instrumentation/jms/jms-1.1/javaagent/src/test/groovy/Jms1Test.groovy deleted file mode 100644 index 7672517cbf..0000000000 --- a/instrumentation/jms/jms-1.1/javaagent/src/test/groovy/Jms1Test.groovy +++ /dev/null @@ -1,382 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification -import io.opentelemetry.instrumentation.test.asserts.TraceAssert -import io.opentelemetry.sdk.trace.data.SpanData -import io.opentelemetry.semconv.SemanticAttributes -import org.apache.activemq.ActiveMQConnectionFactory -import org.apache.activemq.command.ActiveMQTextMessage -import org.slf4j.Logger -import org.slf4j.LoggerFactory -import org.testcontainers.containers.GenericContainer -import org.testcontainers.containers.output.Slf4jLogConsumer -import spock.lang.Shared -import spock.lang.Unroll - -import javax.jms.Connection -import javax.jms.Message -import javax.jms.MessageListener -import javax.jms.Session -import javax.jms.TextMessage -import java.util.concurrent.CountDownLatch -import java.util.concurrent.atomic.AtomicReference - -import static io.opentelemetry.api.trace.SpanKind.CONSUMER -import static io.opentelemetry.api.trace.SpanKind.PRODUCER - -@Unroll -class Jms1Test extends AgentInstrumentationSpecification { - - private static final Logger logger = LoggerFactory.getLogger(Jms1Test) - - private static final GenericContainer broker = new GenericContainer("rmohr/activemq:latest") - .withExposedPorts(61616, 8161) - .withLogConsumer(new Slf4jLogConsumer(logger)) - - @Shared - String messageText = "a message" - @Shared - Session session - - ActiveMQTextMessage message = session.createTextMessage(messageText) - - def setupSpec() { - broker.start() - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:" + broker.getMappedPort(61616)) - - Connection connection = connectionFactory.createConnection() - connection.start() - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE) - } - - def cleanupSpec() { - broker.stop() - } - - def "sending a message to #destinationName generates spans"() { - setup: - def producer = session.createProducer(destination) - def consumer = session.createConsumer(destination) - - runWithSpan("producer parent") { - producer.send(message) - } - - TextMessage receivedMessage = runWithSpan("consumer parent") { - return consumer.receive() as TextMessage - } - String messageId = receivedMessage.getJMSMessageID() - - expect: - receivedMessage.text == messageText - assertTraces(2) { - SpanData producerSpanData - trace(0, 2) { - span(0) { - name "producer parent" - hasNoParent() - } - producerSpan(it, 1, destinationName, span(0)) - - producerSpanData = span(1) - } - trace(1, 2) { - span(0) { - name "consumer parent" - hasNoParent() - } - consumerSpan(it, 1, destinationName, messageId, "receive", span(0), producerSpanData) - } - } - - cleanup: - producer.close() - consumer.close() - - where: - destination | destinationName - session.createQueue("someQueue") | "someQueue" - session.createTopic("someTopic") | "someTopic" - session.createTemporaryQueue() | "(temporary)" - session.createTemporaryTopic() | "(temporary)" - } - - def "sending to a MessageListener on #destinationName generates a span"() { - setup: - def lock = new CountDownLatch(1) - def messageRef = new AtomicReference() - def producer = session.createProducer(destination) - def consumer = session.createConsumer(destination) - consumer.setMessageListener new MessageListener() { - @Override - void onMessage(Message message) { - lock.await() // ensure the producer trace is reported first. - messageRef.set(message as TextMessage) - } - } - - producer.send(message) - lock.countDown() - - expect: - assertTraces(1) { - trace(0, 2) { - producerSpan(it, 0, destinationName) - consumerSpan(it, 1, destinationName, messageRef.get().getJMSMessageID(), "process", span(0)) - } - } - // This check needs to go after all traces have been accounted for - messageRef.get().text == messageText - - cleanup: - producer.close() - consumer.close() - - where: - destination | destinationName - session.createQueue("someQueue") | "someQueue" - session.createTopic("someTopic") | "someTopic" - session.createTemporaryQueue() | "(temporary)" - session.createTemporaryTopic() | "(temporary)" - } - - def "failing to receive message with receiveNoWait on #destinationName works"() { - setup: - def consumer = session.createConsumer(destination) - - // Receive with timeout - Message receivedMessage = consumer.receiveNoWait() - - expect: - receivedMessage == null - // span is not created if no message is received - assertTraces(0) {} - - cleanup: - consumer.close() - - where: - destination | destinationName - session.createQueue("someQueue") | "someQueue" - session.createTopic("someTopic") | "someTopic" - } - - def "failing to receive message with wait(timeout) on #destinationName works"() { - setup: - def consumer = session.createConsumer(destination) - - // Receive with timeout - Message receivedMessage = consumer.receive(100) - - expect: - receivedMessage == null - // span is not created if no message is received - assertTraces(0) {} - - cleanup: - consumer.close() - - where: - destination | destinationName - session.createQueue("someQueue") | "someQueue" - session.createTopic("someTopic") | "someTopic" - } - - def "sending a read-only message to #destinationName fails"() { - setup: - def producer = session.createProducer(destination) - def consumer = session.createConsumer(destination) - - expect: - !message.isReadOnlyProperties() - - when: - message.setReadOnlyProperties(true) - and: - producer.send(message) - - TextMessage receivedMessage = consumer.receive() as TextMessage - - then: - receivedMessage.text == messageText - - // This will result in a logged failure because we tried to - // write properties in MessagePropertyTextMap when readOnlyProperties = true. - // The consumer span will also not be linked to the parent. - assertTraces(2) { - trace(0, 1) { - producerSpan(it, 0, destinationName) - } - trace(1, 1) { - consumerSpan(it, 0, destinationName, "", "receive", null) - } - } - - cleanup: - producer.close() - consumer.close() - - where: - destination | destinationName - session.createQueue("someQueue") | "someQueue" - session.createTopic("someTopic") | "someTopic" - session.createTemporaryQueue() | "(temporary)" - session.createTemporaryTopic() | "(temporary)" - } - - def "sending a message to #destinationName with explicit destination propagates context"() { - given: - def producer = session.createProducer(null) - def consumer = session.createConsumer(destination) - - def lock = new CountDownLatch(1) - def messageRef = new AtomicReference() - consumer.setMessageListener new MessageListener() { - @Override - void onMessage(Message message) { - lock.await() // ensure the producer trace is reported first. - messageRef.set(message as TextMessage) - } - } - - when: - runWithSpan("parent") { - producer.send(destination, message) - } - lock.countDown() - - then: - assertTraces(1) { - trace(0, 3) { - span(0) { - name "parent" - hasNoParent() - } - producerSpan(it, 1, destinationName, span(0)) - consumerSpan(it, 2, destinationName, messageRef.get().getJMSMessageID(), "process", span(1)) - } - } - // This check needs to go after all traces have been accounted for - messageRef.get().text == messageText - - cleanup: - producer.close() - consumer.close() - - where: - destination | destinationName - session.createQueue("someQueue") | "someQueue" - session.createTopic("someTopic") | "someTopic" - session.createTemporaryQueue() | "(temporary)" - session.createTemporaryTopic() | "(temporary)" - } - - def "capture message header as span attribute"() { - setup: - def destinationName = "someQueue" - def destination = session.createQueue(destinationName) - def producer = session.createProducer(destination) - def consumer = session.createConsumer(destination) - - def message = session.createTextMessage(messageText) - message.setStringProperty("test-message-header", "test") - message.setIntProperty("test-message-int-header", 1234) - runWithSpan("producer parent") { - producer.send(message) - } - - TextMessage receivedMessage = runWithSpan("consumer parent") { - return consumer.receive() as TextMessage - } - String messageId = receivedMessage.getJMSMessageID() - - expect: - receivedMessage.text == messageText - assertTraces(2) { - SpanData producerSpanData - trace(0, 2) { - span(0) { - name "producer parent" - hasNoParent() - } - producerSpan(it, 1, destinationName, span(0), true) - - producerSpanData = span(1) - } - trace(1, 2) { - span(0) { - name "consumer parent" - hasNoParent() - } - consumerSpan(it, 1, destinationName, messageId, "receive", span(0), producerSpanData, true) - } - } - - cleanup: - producer.close() - consumer.close() - } - - static producerSpan(TraceAssert trace, int index, String destinationName, SpanData parentSpan = null, boolean testHeaders = false) { - trace.span(index) { - name destinationName + " publish" - kind PRODUCER - if (parentSpan == null) { - hasNoParent() - } else { - childOf(parentSpan) - } - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "jms" - "$SemanticAttributes.MESSAGING_DESTINATION_NAME" destinationName - if (destinationName == "(temporary)") { - "$SemanticAttributes.MESSAGING_DESTINATION_TEMPORARY" true - } - "$SemanticAttributes.MESSAGING_MESSAGE_ID" String - if (testHeaders) { - "messaging.header.test_message_header" { it == ["test"] } - "messaging.header.test_message_int_header" { it == ["1234"] } - } - } - } - } - - // passing messageId = null will verify message.id is not captured, - // passing messageId = "" will verify message.id is captured (but won't verify anything about the value), - // any other value for messageId will verify that message.id is captured and has that same value - static consumerSpan(TraceAssert trace, int index, String destinationName, String messageId, String operation, SpanData parentSpan, SpanData linkedSpan = null, boolean testHeaders = false) { - trace.span(index) { - name destinationName + " " + operation - kind CONSUMER - if (parentSpan == null) { - hasNoParent() - } else { - childOf(parentSpan) - } - if (linkedSpan == null) { - hasNoLinks() - } else { - hasLink(linkedSpan) - } - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "jms" - "$SemanticAttributes.MESSAGING_DESTINATION_NAME" destinationName - "$SemanticAttributes.MESSAGING_OPERATION" operation - if (messageId != null) { - //In some tests we don't know exact messageId, so we pass "" and verify just the existence of the attribute - "$SemanticAttributes.MESSAGING_MESSAGE_ID" { it == messageId || messageId == "" } - } - if (destinationName == "(temporary)") { - "$SemanticAttributes.MESSAGING_DESTINATION_TEMPORARY" true - } - if (testHeaders) { - "messaging.header.test_message_header" { it == ["test"] } - "messaging.header.test_message_int_header" { it == ["1234"] } - } - } - } - } -} diff --git a/instrumentation/jms/jms-1.1/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/jms/v1_1/Jms1InstrumentationTest.java b/instrumentation/jms/jms-1.1/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/jms/v1_1/Jms1InstrumentationTest.java new file mode 100644 index 0000000000..9c6c828649 --- /dev/null +++ b/instrumentation/jms/jms-1.1/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/jms/v1_1/Jms1InstrumentationTest.java @@ -0,0 +1,409 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.jms.v1_1; + +import static io.opentelemetry.api.common.AttributeKey.stringArrayKey; +import static io.opentelemetry.api.trace.SpanKind.CONSUMER; +import static io.opentelemetry.api.trace.SpanKind.PRODUCER; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static java.util.Collections.singletonList; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.params.provider.Arguments.arguments; + +import io.opentelemetry.instrumentation.testing.internal.AutoCleanupExtension; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.sdk.testing.assertj.AttributeAssertion; +import io.opentelemetry.sdk.trace.data.LinkData; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.semconv.SemanticAttributes; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.assertj.core.api.AbstractAssert; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ArgumentsProvider; +import org.junit.jupiter.params.provider.ArgumentsSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; + +public class Jms1InstrumentationTest { + + static final Logger logger = LoggerFactory.getLogger(Jms1InstrumentationTest.class); + + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @RegisterExtension static final AutoCleanupExtension cleanup = AutoCleanupExtension.create(); + + static GenericContainer broker; + static ActiveMQConnectionFactory connectionFactory; + static Connection connection; + static Session session; + + @BeforeAll + static void setUp() throws JMSException { + broker = + new GenericContainer<>("rmohr/activemq:latest") + .withExposedPorts(61616, 8161) + .withLogConsumer(new Slf4jLogConsumer(logger)); + broker.start(); + + connectionFactory = + new ActiveMQConnectionFactory("tcp://localhost:" + broker.getMappedPort(61616)); + Connection connection = connectionFactory.createConnection(); + connection.start(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + @AfterAll + static void tearDown() throws JMSException { + if (session != null) { + session.close(); + } + if (connection != null) { + connection.close(); + } + if (broker != null) { + broker.close(); + } + } + + @ArgumentsSource(DestinationsProvider.class) + @ParameterizedTest + void testMessageConsumer( + DestinationFactory destinationFactory, String destinationName, boolean isTemporary) + throws JMSException { + + // given + Destination destination = destinationFactory.create(session); + TextMessage sentMessage = session.createTextMessage("a message"); + + MessageProducer producer = session.createProducer(destination); + cleanup.deferCleanup(producer::close); + MessageConsumer consumer = session.createConsumer(destination); + cleanup.deferCleanup(consumer::close); + + // when + testing.runWithSpan("producer parent", () -> producer.send(sentMessage)); + + TextMessage receivedMessage = + testing.runWithSpan("consumer parent", () -> (TextMessage) consumer.receive()); + + // then + assertThat(receivedMessage.getText()).isEqualTo(sentMessage.getText()); + + String messageId = receivedMessage.getJMSMessageID(); + + AtomicReference producerSpan = new AtomicReference<>(); + testing.waitAndAssertTraces( + trace -> { + trace.hasSpansSatisfyingExactly( + span -> span.hasName("producer parent").hasNoParent(), + span -> + span.hasName(destinationName + " publish") + .hasKind(PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName), + equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId), + messagingTempDestination(isTemporary))); + + producerSpan.set(trace.getSpan(1)); + }, + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("consumer parent").hasNoParent(), + span -> + span.hasName(destinationName + " receive") + .hasKind(CONSUMER) + .hasParent(trace.getSpan(0)) + .hasLinks(LinkData.create(producerSpan.get().getSpanContext())) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"), + equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId), + messagingTempDestination(isTemporary)))); + } + + @ArgumentsSource(DestinationsProvider.class) + @ParameterizedTest + void testMessageListener( + DestinationFactory destinationFactory, String destinationName, boolean isTemporary) + throws Exception { + + // given + Destination destination = destinationFactory.create(session); + TextMessage sentMessage = session.createTextMessage("a message"); + + MessageProducer producer = session.createProducer(null); + cleanup.deferCleanup(producer::close); + MessageConsumer consumer = session.createConsumer(destination); + cleanup.deferCleanup(consumer::close); + + CompletableFuture receivedMessageFuture = new CompletableFuture<>(); + consumer.setMessageListener( + message -> + testing.runWithSpan( + "consumer", () -> receivedMessageFuture.complete((TextMessage) message))); + + // when + testing.runWithSpan("producer parent", () -> producer.send(destination, sentMessage)); + + // then + TextMessage receivedMessage = receivedMessageFuture.get(10, TimeUnit.SECONDS); + assertThat(receivedMessage.getText()).isEqualTo(sentMessage.getText()); + + String messageId = receivedMessage.getJMSMessageID(); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("producer parent").hasNoParent(), + span -> + span.hasName(destinationName + " publish") + .hasKind(PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName), + equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId), + messagingTempDestination(isTemporary)), + span -> + span.hasName(destinationName + " process") + .hasKind(CONSUMER) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), + equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId), + messagingTempDestination(isTemporary)), + span -> span.hasName("consumer").hasParent(trace.getSpan(2)))); + } + + @ArgumentsSource(EmptyReceiveArgumentsProvider.class) + @ParameterizedTest + void shouldNotEmitTelemetryOnEmptyReceive( + DestinationFactory destinationFactory, MessageReceiver receiver) throws JMSException { + + // given + Destination destination = destinationFactory.create(session); + + MessageConsumer consumer = session.createConsumer(destination); + cleanup.deferCleanup(consumer::close); + + // when + Message message = receiver.receive(consumer); + + // then + assertThat(message).isNull(); + + testing.waitForTraces(0); + } + + @ArgumentsSource(DestinationsProvider.class) + @ParameterizedTest + void shouldCaptureMessageHeaders( + DestinationFactory destinationFactory, String destinationName, boolean isTemporary) + throws Exception { + + // given + Destination destination = destinationFactory.create(session); + TextMessage sentMessage = session.createTextMessage("a message"); + sentMessage.setStringProperty("test_message_header", "test"); + sentMessage.setIntProperty("test_message_int_header", 1234); + + MessageProducer producer = session.createProducer(destination); + cleanup.deferCleanup(producer::close); + MessageConsumer consumer = session.createConsumer(destination); + cleanup.deferCleanup(consumer::close); + + CompletableFuture receivedMessageFuture = new CompletableFuture<>(); + consumer.setMessageListener( + message -> + testing.runWithSpan( + "consumer", () -> receivedMessageFuture.complete((TextMessage) message))); + + // when + testing.runWithSpan("producer parent", () -> producer.send(sentMessage)); + + // then + TextMessage receivedMessage = receivedMessageFuture.get(10, TimeUnit.SECONDS); + assertThat(receivedMessage.getText()).isEqualTo(sentMessage.getText()); + + String messageId = receivedMessage.getJMSMessageID(); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("producer parent").hasNoParent(), + span -> + span.hasName(destinationName + " publish") + .hasKind(PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName), + equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId), + messagingTempDestination(isTemporary), + equalTo( + stringArrayKey("messaging.header.test_message_header"), + singletonList("test")), + equalTo( + stringArrayKey("messaging.header.test_message_int_header"), + singletonList("1234"))), + span -> + span.hasName(destinationName + " process") + .hasKind(CONSUMER) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), + equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId), + messagingTempDestination(isTemporary), + equalTo( + stringArrayKey("messaging.header.test_message_header"), + singletonList("test")), + equalTo( + stringArrayKey("messaging.header.test_message_int_header"), + singletonList("1234"))), + span -> span.hasName("consumer").hasParent(trace.getSpan(2)))); + } + + @ArgumentsSource(DestinationsProvider.class) + @ParameterizedTest + void shouldFailWhenSendingReadOnlyMessage( + DestinationFactory destinationFactory, String destinationName, boolean isTemporary) + throws Exception { + + // given + Destination destination = destinationFactory.create(session); + ActiveMQTextMessage sentMessage = (ActiveMQTextMessage) session.createTextMessage("a message"); + + MessageProducer producer = session.createProducer(destination); + cleanup.deferCleanup(producer::close); + MessageConsumer consumer = session.createConsumer(destination); + cleanup.deferCleanup(consumer::close); + + sentMessage.setReadOnlyProperties(true); + + // when + testing.runWithSpan("producer parent", () -> producer.send(sentMessage)); + + TextMessage receivedMessage = (TextMessage) consumer.receive(); + + // then + assertThat(receivedMessage.getText()).isEqualTo(sentMessage.getText()); + + String messageId = receivedMessage.getJMSMessageID(); + + // This will result in a logged failure because we tried to + // write properties in MessagePropertyTextMap when readOnlyProperties = true. + // As a result, the consumer span will not be linked to the producer span as we are unable to + // propagate the trace context as a message property. + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("producer parent").hasNoParent(), + span -> + span.hasName(destinationName + " publish") + .hasKind(PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName), + equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId), + messagingTempDestination(isTemporary))), + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName(destinationName + " receive") + .hasKind(CONSUMER) + .hasNoParent() + .hasTotalRecordedLinks(0) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"), + equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId), + messagingTempDestination(isTemporary)))); + } + + private static AttributeAssertion messagingTempDestination(boolean isTemporary) { + return isTemporary + ? equalTo(SemanticAttributes.MESSAGING_DESTINATION_TEMPORARY, true) + : satisfies(SemanticAttributes.MESSAGING_DESTINATION_TEMPORARY, AbstractAssert::isNull); + } + + static final class EmptyReceiveArgumentsProvider implements ArgumentsProvider { + + @Override + public Stream provideArguments(ExtensionContext context) { + DestinationFactory topic = session -> session.createTopic("someTopic"); + DestinationFactory queue = session -> session.createQueue("someQueue"); + MessageReceiver receive = consumer -> consumer.receive(100); + MessageReceiver receiveNoWait = MessageConsumer::receiveNoWait; + + return Stream.of( + arguments(topic, receive), + arguments(queue, receive), + arguments(topic, receiveNoWait), + arguments(queue, receiveNoWait)); + } + } + + static final class DestinationsProvider implements ArgumentsProvider { + + @Override + public Stream provideArguments(ExtensionContext context) { + DestinationFactory topic = session -> session.createTopic("someTopic"); + DestinationFactory queue = session -> session.createQueue("someQueue"); + DestinationFactory tempTopic = Session::createTemporaryTopic; + DestinationFactory tempQueue = Session::createTemporaryQueue; + + return Stream.of( + arguments(topic, "someTopic", false), + arguments(queue, "someQueue", false), + arguments(tempTopic, "(temporary)", true), + arguments(tempQueue, "(temporary)", true)); + } + } + + @FunctionalInterface + interface DestinationFactory { + + Destination create(Session session) throws JMSException; + } + + @FunctionalInterface + interface MessageReceiver { + + Message receive(MessageConsumer consumer) throws JMSException; + } +}