Make @RabbitListener propagate context properly (#3339)

This commit is contained in:
Mateusz Rzeszutek 2021-06-17 19:38:53 +02:00 committed by GitHub
parent 97254f04d1
commit 108b1298cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 417 additions and 0 deletions

View File

@ -0,0 +1,20 @@
apply from: "$rootDir/gradle/instrumentation.gradle"
muzzle {
pass {
group = 'org.springframework.amqp'
module = 'spring-rabbit'
versions = "(,)"
}
}
dependencies {
library 'org.springframework.amqp:spring-rabbit:1.0.0.RELEASE'
testInstrumentation project(':instrumentation:rabbitmq-2.7:javaagent')
// 2.1.7 adds the @RabbitListener annotation, we need that for tests
testLibrary 'org.springframework.amqp:spring-rabbit:2.1.7.RELEASE'
testLibrary "org.springframework.boot:spring-boot-starter-test:1.5.22.RELEASE"
testLibrary "org.springframework.boot:spring-boot-starter:1.5.22.RELEASE"
}

View File

@ -0,0 +1,73 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.rabbit;
import static io.opentelemetry.javaagent.instrumentation.spring.rabbit.SpringRabbitSingletons.instrumenter;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.springframework.amqp.core.Message;
public class AbstractMessageListenerContainerInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer");
}
@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
named("invokeListener")
.and(
takesArguments(2)
.and(
takesArgument(1, Object.class)
.or(takesArgument(1, named("org.springframework.amqp.core.Message"))))),
getClass().getName() + "$InvokeListenerAdvice");
}
@SuppressWarnings("unused")
public static class InvokeListenerAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(1) Object data,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
if (!(data instanceof Message)) {
return;
}
Context parentContext = Java8BytecodeBridge.currentContext();
Message message = (Message) data;
if (instrumenter().shouldStart(parentContext, message)) {
context = instrumenter().start(parentContext, message);
scope = context.makeCurrent();
}
}
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
public static void onEnter(
@Advice.Argument(1) Object data,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope,
@Advice.Thrown Throwable throwable) {
if (scope == null || !(data instanceof Message)) {
return;
}
scope.close();
instrumenter().end(context, (Message) data, null, throwable);
}
}
}

View File

@ -0,0 +1,24 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.rabbit;
import io.opentelemetry.context.propagation.TextMapGetter;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.springframework.amqp.core.Message;
final class MessageHeaderGetter implements TextMapGetter<Message> {
@Override
public Iterable<String> keys(Message carrier) {
return carrier.getMessageProperties().getHeaders().keySet();
}
@Nullable
@Override
public String get(Message carrier, String key) {
Object value = carrier.getMessageProperties().getHeaders().get(key);
return value == null ? null : value.toString();
}
}

View File

@ -0,0 +1,25 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.rabbit;
import static java.util.Collections.singletonList;
import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import java.util.List;
@AutoService(InstrumentationModule.class)
public class SpringRabbitInstrumentationModule extends InstrumentationModule {
public SpringRabbitInstrumentationModule() {
super("spring-rabbit", "spring-rabbit-1.0");
}
@Override
public List<TypeInstrumentation> typeInstrumentations() {
return singletonList(new AbstractMessageListenerContainerInstrumentation());
}
}

View File

@ -0,0 +1,74 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.rabbit;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.springframework.amqp.core.Message;
final class SpringRabbitMessageAttributesExtractor
extends MessagingAttributesExtractor<Message, Void> {
@Override
protected String system(Message message) {
return "rabbitmq";
}
@Override
protected String destinationKind(Message message) {
return "queue";
}
@Override
protected @Nullable String destination(Message message) {
return message.getMessageProperties().getReceivedRoutingKey();
}
@Override
protected boolean temporaryDestination(Message message) {
return false;
}
@Override
protected @Nullable String protocol(Message message) {
return null;
}
@Override
protected @Nullable String protocolVersion(Message message) {
return null;
}
@Override
protected @Nullable String url(Message message) {
return null;
}
@Override
protected @Nullable String conversationId(Message message) {
return null;
}
@Override
protected Long messagePayloadSize(Message message) {
return message.getMessageProperties().getContentLength();
}
@Override
protected @Nullable Long messagePayloadCompressedSize(Message message) {
return null;
}
@Override
protected MessageOperation operation(Message message) {
return MessageOperation.PROCESS;
}
@Override
protected @Nullable String messageId(Message message, @Nullable Void unused) {
return message.getMessageProperties().getMessageId();
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.rabbit;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import org.springframework.amqp.core.Message;
public final class SpringRabbitSingletons {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.javaagent.spring-rabbit-1.0";
private static final Instrumenter<Message, Void> INSTRUMENTER;
static {
SpringRabbitMessageAttributesExtractor attributesExtractor =
new SpringRabbitMessageAttributesExtractor();
SpanNameExtractor<Message> spanNameExtractor =
MessagingSpanNameExtractor.create(attributesExtractor);
INSTRUMENTER =
Instrumenter.<Message, Void>newBuilder(
GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, spanNameExtractor)
.addAttributesExtractor(attributesExtractor)
.newConsumerInstrumenter(new MessageHeaderGetter());
}
public static Instrumenter<Message, Void> instrumenter() {
return INSTRUMENTER;
}
private SpringRabbitSingletons() {}
}

View File

@ -0,0 +1,162 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import static io.opentelemetry.api.trace.SpanKind.CLIENT
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runInternalSpan
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace
import com.rabbitmq.client.ConnectionFactory
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import java.time.Duration
import org.springframework.amqp.core.AmqpTemplate
import org.springframework.amqp.core.Queue
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.boot.SpringApplication
import org.springframework.boot.SpringBootConfiguration
import org.springframework.boot.autoconfigure.EnableAutoConfiguration
import org.springframework.context.ConfigurableApplicationContext
import org.springframework.context.annotation.Bean
import org.testcontainers.containers.GenericContainer
import spock.lang.Shared
class ContextPropagationTest extends AgentInstrumentationSpecification {
@Shared
GenericContainer rabbitMqContainer
@Shared
ConfigurableApplicationContext applicationContext
@Shared
ConnectionFactory connectionFactory
def setupSpec() {
rabbitMqContainer = new GenericContainer('rabbitmq:latest')
.withExposedPorts(5672)
.withStartupTimeout(Duration.ofSeconds(120))
rabbitMqContainer.start()
def app = new SpringApplication(ConsumerConfig)
app.setDefaultProperties([
"spring.jmx.enabled" : false,
"spring.main.web-application-type": "none",
"spring.rabbitmq.host" : rabbitMqContainer.containerIpAddress,
"spring.rabbitmq.port" : rabbitMqContainer.getMappedPort(5672),
])
applicationContext = app.run()
connectionFactory = new ConnectionFactory(
host: rabbitMqContainer.containerIpAddress,
port: rabbitMqContainer.getMappedPort(5672)
)
}
def cleanupSpec() {
rabbitMqContainer?.stop()
applicationContext?.close()
}
def "should propagate context to consumer"() {
given:
def connection = connectionFactory.newConnection()
def channel = connection.createChannel()
when:
runUnderTrace("parent") {
applicationContext.getBean(AmqpTemplate)
.convertAndSend(ConsumerConfig.TEST_QUEUE, "test")
}
then:
assertTraces(2) {
trace(0, 5) {
span(0) {
name "parent"
}
span(1) {
// created by rabbitmq instrumentation
name "<default> -> testQueue send"
kind PRODUCER
childOf span(0)
attributes {
"${SemanticAttributes.NET_PEER_NAME.key}" "localhost"
"${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1"
"${SemanticAttributes.NET_PEER_PORT.key}" Long
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "rabbitmq"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" "<default>"
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "queue"
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
}
}
span(2) {
// created by rabbitmq instrumentation
name "testQueue process"
kind CONSUMER
childOf span(1)
attributes {
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "rabbitmq"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" "<default>"
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "queue"
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
}
}
span(3) {
// created by spring-amqp instrumentation
name "testQueue process"
kind CONSUMER
childOf span(1)
attributes {
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "rabbitmq"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" "testQueue"
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "queue"
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
}
}
span(4) {
name "consumer"
childOf span(3)
}
}
trace(1, 1) {
span(0) {
// created by rabbitmq instrumentation
name "basic.ack"
kind CLIENT
attributes {
"${SemanticAttributes.NET_PEER_NAME.key}" "localhost"
"${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1"
"${SemanticAttributes.NET_PEER_PORT.key}" Long
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "rabbitmq"
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "queue"
}
}
}
}
cleanup:
channel?.close()
connection?.close()
}
@SpringBootConfiguration
@EnableAutoConfiguration
static class ConsumerConfig {
static final String TEST_QUEUE = "testQueue"
@Bean
Queue testQueue() {
new Queue(TEST_QUEUE)
}
@RabbitListener(queues = TEST_QUEUE)
void consume(String ignored) {
runInternalSpan("consumer")
}
}
}

View File

@ -282,6 +282,7 @@ include ':instrumentation:spring:spring-data-1.8:javaagent'
include ':instrumentation:spring:spring-integration-4.1:javaagent'
include ':instrumentation:spring:spring-integration-4.1:library'
include ':instrumentation:spring:spring-integration-4.1:testing'
include ':instrumentation:spring:spring-rabbit-1.0:javaagent'
include ':instrumentation:spring:spring-scheduling-3.1:javaagent'
include ':instrumentation:spring:spring-web-3.1:library'
include ':instrumentation:spring:spring-webmvc-3.1:javaagent'