Fix UnsupportedOperationException happening when reactor-rabbitmq is used (#3381)

This commit is contained in:
Mateusz Rzeszutek 2021-06-23 01:27:06 +02:00 committed by GitHub
parent 0788a036f9
commit 2c2c19d293
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 96 additions and 25 deletions

View File

@ -15,6 +15,10 @@ dependencies {
testLibrary ("org.springframework.amqp:spring-rabbit:1.1.0.RELEASE") {
exclude group: 'com.rabbitmq', module: 'amqp-client'
}
testInstrumentation project(':instrumentation:reactor-3.1:javaagent')
testLibrary 'io.projectreactor.rabbitmq:reactor-rabbitmq:1.0.0.RELEASE'
}
tasks.withType(Test).configureEach {

View File

@ -50,7 +50,9 @@ public class RabbitChannelInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return implementsInterface(named("com.rabbitmq.client.Channel"));
return implementsInterface(named("com.rabbitmq.client.Channel"))
// broken implementation that throws UnsupportedOperationException on getConnection() calls
.and(not(named("reactor.rabbitmq.ChannelProxy")));
}
@Override

View File

@ -12,7 +12,6 @@ import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTra
import com.rabbitmq.client.AMQP
import com.rabbitmq.client.Channel
import com.rabbitmq.client.Connection
import com.rabbitmq.client.ConnectionFactory
import com.rabbitmq.client.Consumer
import com.rabbitmq.client.DefaultConsumer
import com.rabbitmq.client.Envelope
@ -23,42 +22,24 @@ import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.instrumentation.test.asserts.TraceAssert
import io.opentelemetry.sdk.trace.data.SpanData
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import java.time.Duration
import org.springframework.amqp.core.AmqpAdmin
import org.springframework.amqp.core.AmqpTemplate
import org.springframework.amqp.core.Queue
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory
import org.springframework.amqp.rabbit.core.RabbitAdmin
import org.springframework.amqp.rabbit.core.RabbitTemplate
import org.testcontainers.containers.GenericContainer
import spock.lang.Shared
class RabbitMQTest extends AgentInstrumentationSpecification {
class RabbitMqTest extends AgentInstrumentationSpecification implements WithRabbitMqTrait {
@Shared
def rabbitMQContainer
@Shared
InetSocketAddress rabbitmqAddress
ConnectionFactory factory = new ConnectionFactory(host: rabbitmqAddress.hostName, port: rabbitmqAddress.port)
Connection conn = factory.newConnection()
Connection conn = connectionFactory.newConnection()
Channel channel = conn.createChannel()
def setupSpec() {
rabbitMQContainer = new GenericContainer('rabbitmq:latest')
.withExposedPorts(5672)
.withStartupTimeout(Duration.ofSeconds(120))
rabbitMQContainer.start()
rabbitmqAddress = new InetSocketAddress(
rabbitMQContainer.containerIpAddress,
rabbitMQContainer.getMappedPort(5672)
)
startRabbit()
}
def cleanupSpec() {
if (rabbitMQContainer) {
rabbitMQContainer.stop()
}
stopRabbit()
}
def cleanup() {
@ -266,7 +247,7 @@ class RabbitMQTest extends AgentInstrumentationSpecification {
def "test spring rabbit"() {
setup:
def connectionFactory = new CachingConnectionFactory(rabbitmqAddress.hostName, rabbitmqAddress.port)
def connectionFactory = new CachingConnectionFactory(connectionFactory)
AmqpAdmin admin = new RabbitAdmin(connectionFactory)
def queue = new Queue("some-routing-queue", false, true, true, null)
admin.declareQueue(queue)

View File

@ -0,0 +1,54 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.api.trace.SpanKind
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import reactor.rabbitmq.ExchangeSpecification
import reactor.rabbitmq.RabbitFlux
import reactor.rabbitmq.SenderOptions
class ReactorRabbitMqTest extends AgentInstrumentationSpecification implements WithRabbitMqTrait {
def setupSpec() {
startRabbit()
}
def cleanupSpec() {
stopRabbit()
}
def "should not fail declaring exchange"() {
given:
def sender = RabbitFlux.createSender(new SenderOptions().connectionFactory(connectionFactory))
when:
sender.declareExchange(ExchangeSpecification.exchange("testExchange"))
.block()
then:
noExceptionThrown()
assertTraces(1) {
trace(0, 1) {
span(0) {
name 'exchange.declare'
kind SpanKind.CLIENT
attributes {
"${SemanticAttributes.NET_PEER_NAME.key}" { it == null || it instanceof String }
"${SemanticAttributes.NET_PEER_IP.key}" String
"${SemanticAttributes.NET_PEER_PORT.key}" { it == null || it instanceof Long }
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "rabbitmq"
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "queue"
"rabbitmq.command" "exchange.declare"
}
}
}
}
cleanup:
sender?.close()
}
}

View File

@ -0,0 +1,30 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import com.rabbitmq.client.ConnectionFactory
import java.time.Duration
import org.testcontainers.containers.GenericContainer
trait WithRabbitMqTrait {
static GenericContainer rabbitMqContainer
static ConnectionFactory connectionFactory
def startRabbit() {
rabbitMqContainer = new GenericContainer('rabbitmq:latest')
.withExposedPorts(5672)
.withStartupTimeout(Duration.ofSeconds(120))
rabbitMqContainer.start()
connectionFactory = new ConnectionFactory(
host: rabbitMqContainer.containerIpAddress,
port: rabbitMqContainer.getMappedPort(5672)
)
}
def stopRabbit() {
rabbitMqContainer?.stop()
}
}