diff --git a/instrumentation/rabbitmq-2.7/javaagent/rabbitmq-2.7-javaagent.gradle b/instrumentation/rabbitmq-2.7/javaagent/rabbitmq-2.7-javaagent.gradle index c461945d83..f8b77dc736 100644 --- a/instrumentation/rabbitmq-2.7/javaagent/rabbitmq-2.7-javaagent.gradle +++ b/instrumentation/rabbitmq-2.7/javaagent/rabbitmq-2.7-javaagent.gradle @@ -15,6 +15,10 @@ dependencies { testLibrary ("org.springframework.amqp:spring-rabbit:1.1.0.RELEASE") { exclude group: 'com.rabbitmq', module: 'amqp-client' } + + testInstrumentation project(':instrumentation:reactor-3.1:javaagent') + + testLibrary 'io.projectreactor.rabbitmq:reactor-rabbitmq:1.0.0.RELEASE' } tasks.withType(Test).configureEach { diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitChannelInstrumentation.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitChannelInstrumentation.java index 595f146816..d7fb3a315e 100644 --- a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitChannelInstrumentation.java +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitChannelInstrumentation.java @@ -50,7 +50,9 @@ public class RabbitChannelInstrumentation implements TypeInstrumentation { @Override public ElementMatcher typeMatcher() { - return implementsInterface(named("com.rabbitmq.client.Channel")); + return implementsInterface(named("com.rabbitmq.client.Channel")) + // broken implementation that throws UnsupportedOperationException on getConnection() calls + .and(not(named("reactor.rabbitmq.ChannelProxy"))); } @Override diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/test/groovy/RabbitMQTest.groovy b/instrumentation/rabbitmq-2.7/javaagent/src/test/groovy/RabbitMqTest.groovy similarity index 92% rename from instrumentation/rabbitmq-2.7/javaagent/src/test/groovy/RabbitMQTest.groovy rename to instrumentation/rabbitmq-2.7/javaagent/src/test/groovy/RabbitMqTest.groovy index d8eab43b1b..e5045617da 100644 --- a/instrumentation/rabbitmq-2.7/javaagent/src/test/groovy/RabbitMQTest.groovy +++ b/instrumentation/rabbitmq-2.7/javaagent/src/test/groovy/RabbitMqTest.groovy @@ -12,7 +12,6 @@ import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTra import com.rabbitmq.client.AMQP import com.rabbitmq.client.Channel import com.rabbitmq.client.Connection -import com.rabbitmq.client.ConnectionFactory import com.rabbitmq.client.Consumer import com.rabbitmq.client.DefaultConsumer import com.rabbitmq.client.Envelope @@ -23,42 +22,24 @@ import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification import io.opentelemetry.instrumentation.test.asserts.TraceAssert import io.opentelemetry.sdk.trace.data.SpanData import io.opentelemetry.semconv.trace.attributes.SemanticAttributes -import java.time.Duration import org.springframework.amqp.core.AmqpAdmin import org.springframework.amqp.core.AmqpTemplate import org.springframework.amqp.core.Queue import org.springframework.amqp.rabbit.connection.CachingConnectionFactory import org.springframework.amqp.rabbit.core.RabbitAdmin import org.springframework.amqp.rabbit.core.RabbitTemplate -import org.testcontainers.containers.GenericContainer -import spock.lang.Shared -class RabbitMQTest extends AgentInstrumentationSpecification { +class RabbitMqTest extends AgentInstrumentationSpecification implements WithRabbitMqTrait { - @Shared - def rabbitMQContainer - @Shared - InetSocketAddress rabbitmqAddress - - ConnectionFactory factory = new ConnectionFactory(host: rabbitmqAddress.hostName, port: rabbitmqAddress.port) - Connection conn = factory.newConnection() + Connection conn = connectionFactory.newConnection() Channel channel = conn.createChannel() def setupSpec() { - rabbitMQContainer = new GenericContainer('rabbitmq:latest') - .withExposedPorts(5672) - .withStartupTimeout(Duration.ofSeconds(120)) - rabbitMQContainer.start() - rabbitmqAddress = new InetSocketAddress( - rabbitMQContainer.containerIpAddress, - rabbitMQContainer.getMappedPort(5672) - ) + startRabbit() } def cleanupSpec() { - if (rabbitMQContainer) { - rabbitMQContainer.stop() - } + stopRabbit() } def cleanup() { @@ -266,7 +247,7 @@ class RabbitMQTest extends AgentInstrumentationSpecification { def "test spring rabbit"() { setup: - def connectionFactory = new CachingConnectionFactory(rabbitmqAddress.hostName, rabbitmqAddress.port) + def connectionFactory = new CachingConnectionFactory(connectionFactory) AmqpAdmin admin = new RabbitAdmin(connectionFactory) def queue = new Queue("some-routing-queue", false, true, true, null) admin.declareQueue(queue) diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/test/groovy/ReactorRabbitMqTest.groovy b/instrumentation/rabbitmq-2.7/javaagent/src/test/groovy/ReactorRabbitMqTest.groovy new file mode 100644 index 0000000000..9b070587d4 --- /dev/null +++ b/instrumentation/rabbitmq-2.7/javaagent/src/test/groovy/ReactorRabbitMqTest.groovy @@ -0,0 +1,54 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import io.opentelemetry.api.trace.SpanKind +import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes +import reactor.rabbitmq.ExchangeSpecification +import reactor.rabbitmq.RabbitFlux +import reactor.rabbitmq.SenderOptions + +class ReactorRabbitMqTest extends AgentInstrumentationSpecification implements WithRabbitMqTrait { + + def setupSpec() { + startRabbit() + } + + def cleanupSpec() { + stopRabbit() + } + + def "should not fail declaring exchange"() { + given: + def sender = RabbitFlux.createSender(new SenderOptions().connectionFactory(connectionFactory)) + + when: + sender.declareExchange(ExchangeSpecification.exchange("testExchange")) + .block() + + then: + noExceptionThrown() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name 'exchange.declare' + kind SpanKind.CLIENT + attributes { + "${SemanticAttributes.NET_PEER_NAME.key}" { it == null || it instanceof String } + "${SemanticAttributes.NET_PEER_IP.key}" String + "${SemanticAttributes.NET_PEER_PORT.key}" { it == null || it instanceof Long } + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "rabbitmq" + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "queue" + "rabbitmq.command" "exchange.declare" + } + } + } + } + + cleanup: + sender?.close() + } +} diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/test/groovy/WithRabbitMqTrait.groovy b/instrumentation/rabbitmq-2.7/javaagent/src/test/groovy/WithRabbitMqTrait.groovy new file mode 100644 index 0000000000..73b7753804 --- /dev/null +++ b/instrumentation/rabbitmq-2.7/javaagent/src/test/groovy/WithRabbitMqTrait.groovy @@ -0,0 +1,30 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import com.rabbitmq.client.ConnectionFactory +import java.time.Duration +import org.testcontainers.containers.GenericContainer + +trait WithRabbitMqTrait { + + static GenericContainer rabbitMqContainer + static ConnectionFactory connectionFactory + + def startRabbit() { + rabbitMqContainer = new GenericContainer('rabbitmq:latest') + .withExposedPorts(5672) + .withStartupTimeout(Duration.ofSeconds(120)) + rabbitMqContainer.start() + + connectionFactory = new ConnectionFactory( + host: rabbitMqContainer.containerIpAddress, + port: rabbitMqContainer.getMappedPort(5672) + ) + } + + def stopRabbit() { + rabbitMqContainer?.stop() + } +} \ No newline at end of file