diff --git a/instrumentation/spring/spring-rabbit-1.0/javaagent/spring-rabbit-1.0-javaagent.gradle b/instrumentation/spring/spring-rabbit-1.0/javaagent/spring-rabbit-1.0-javaagent.gradle new file mode 100644 index 0000000000..625144efb4 --- /dev/null +++ b/instrumentation/spring/spring-rabbit-1.0/javaagent/spring-rabbit-1.0-javaagent.gradle @@ -0,0 +1,20 @@ +apply from: "$rootDir/gradle/instrumentation.gradle" + +muzzle { + pass { + group = 'org.springframework.amqp' + module = 'spring-rabbit' + versions = "(,)" + } +} + +dependencies { + library 'org.springframework.amqp:spring-rabbit:1.0.0.RELEASE' + + testInstrumentation project(':instrumentation:rabbitmq-2.7:javaagent') + + // 2.1.7 adds the @RabbitListener annotation, we need that for tests + testLibrary 'org.springframework.amqp:spring-rabbit:2.1.7.RELEASE' + testLibrary "org.springframework.boot:spring-boot-starter-test:1.5.22.RELEASE" + testLibrary "org.springframework.boot:spring-boot-starter:1.5.22.RELEASE" +} diff --git a/instrumentation/spring/spring-rabbit-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/rabbit/AbstractMessageListenerContainerInstrumentation.java b/instrumentation/spring/spring-rabbit-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/rabbit/AbstractMessageListenerContainerInstrumentation.java new file mode 100644 index 0000000000..d45ca334b9 --- /dev/null +++ b/instrumentation/spring/spring-rabbit-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/rabbit/AbstractMessageListenerContainerInstrumentation.java @@ -0,0 +1,73 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.spring.rabbit; + +import static io.opentelemetry.javaagent.instrumentation.spring.rabbit.SpringRabbitSingletons.instrumenter; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.springframework.amqp.core.Message; + +public class AbstractMessageListenerContainerInstrumentation implements TypeInstrumentation { + @Override + public ElementMatcher typeMatcher() { + return named("org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + named("invokeListener") + .and( + takesArguments(2) + .and( + takesArgument(1, Object.class) + .or(takesArgument(1, named("org.springframework.amqp.core.Message"))))), + getClass().getName() + "$InvokeListenerAdvice"); + } + + @SuppressWarnings("unused") + public static class InvokeListenerAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter( + @Advice.Argument(1) Object data, + @Advice.Local("otelContext") Context context, + @Advice.Local("otelScope") Scope scope) { + if (!(data instanceof Message)) { + return; + } + + Context parentContext = Java8BytecodeBridge.currentContext(); + Message message = (Message) data; + if (instrumenter().shouldStart(parentContext, message)) { + context = instrumenter().start(parentContext, message); + scope = context.makeCurrent(); + } + } + + @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) + public static void onEnter( + @Advice.Argument(1) Object data, + @Advice.Local("otelContext") Context context, + @Advice.Local("otelScope") Scope scope, + @Advice.Thrown Throwable throwable) { + if (scope == null || !(data instanceof Message)) { + return; + } + scope.close(); + instrumenter().end(context, (Message) data, null, throwable); + } + } +} diff --git a/instrumentation/spring/spring-rabbit-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/rabbit/MessageHeaderGetter.java b/instrumentation/spring/spring-rabbit-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/rabbit/MessageHeaderGetter.java new file mode 100644 index 0000000000..d1b10c93aa --- /dev/null +++ b/instrumentation/spring/spring-rabbit-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/rabbit/MessageHeaderGetter.java @@ -0,0 +1,24 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.spring.rabbit; + +import io.opentelemetry.context.propagation.TextMapGetter; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.springframework.amqp.core.Message; + +final class MessageHeaderGetter implements TextMapGetter { + @Override + public Iterable keys(Message carrier) { + return carrier.getMessageProperties().getHeaders().keySet(); + } + + @Nullable + @Override + public String get(Message carrier, String key) { + Object value = carrier.getMessageProperties().getHeaders().get(key); + return value == null ? null : value.toString(); + } +} diff --git a/instrumentation/spring/spring-rabbit-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/rabbit/SpringRabbitInstrumentationModule.java b/instrumentation/spring/spring-rabbit-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/rabbit/SpringRabbitInstrumentationModule.java new file mode 100644 index 0000000000..03d920cd02 --- /dev/null +++ b/instrumentation/spring/spring-rabbit-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/rabbit/SpringRabbitInstrumentationModule.java @@ -0,0 +1,25 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.spring.rabbit; + +import static java.util.Collections.singletonList; + +import com.google.auto.service.AutoService; +import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import java.util.List; + +@AutoService(InstrumentationModule.class) +public class SpringRabbitInstrumentationModule extends InstrumentationModule { + public SpringRabbitInstrumentationModule() { + super("spring-rabbit", "spring-rabbit-1.0"); + } + + @Override + public List typeInstrumentations() { + return singletonList(new AbstractMessageListenerContainerInstrumentation()); + } +} diff --git a/instrumentation/spring/spring-rabbit-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/rabbit/SpringRabbitMessageAttributesExtractor.java b/instrumentation/spring/spring-rabbit-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/rabbit/SpringRabbitMessageAttributesExtractor.java new file mode 100644 index 0000000000..da71997e1a --- /dev/null +++ b/instrumentation/spring/spring-rabbit-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/rabbit/SpringRabbitMessageAttributesExtractor.java @@ -0,0 +1,74 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.spring.rabbit; + +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.springframework.amqp.core.Message; + +final class SpringRabbitMessageAttributesExtractor + extends MessagingAttributesExtractor { + @Override + protected String system(Message message) { + return "rabbitmq"; + } + + @Override + protected String destinationKind(Message message) { + return "queue"; + } + + @Override + protected @Nullable String destination(Message message) { + return message.getMessageProperties().getReceivedRoutingKey(); + } + + @Override + protected boolean temporaryDestination(Message message) { + return false; + } + + @Override + protected @Nullable String protocol(Message message) { + return null; + } + + @Override + protected @Nullable String protocolVersion(Message message) { + return null; + } + + @Override + protected @Nullable String url(Message message) { + return null; + } + + @Override + protected @Nullable String conversationId(Message message) { + return null; + } + + @Override + protected Long messagePayloadSize(Message message) { + return message.getMessageProperties().getContentLength(); + } + + @Override + protected @Nullable Long messagePayloadCompressedSize(Message message) { + return null; + } + + @Override + protected MessageOperation operation(Message message) { + return MessageOperation.PROCESS; + } + + @Override + protected @Nullable String messageId(Message message, @Nullable Void unused) { + return message.getMessageProperties().getMessageId(); + } +} diff --git a/instrumentation/spring/spring-rabbit-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/rabbit/SpringRabbitSingletons.java b/instrumentation/spring/spring-rabbit-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/rabbit/SpringRabbitSingletons.java new file mode 100644 index 0000000000..c2c316ecb5 --- /dev/null +++ b/instrumentation/spring/spring-rabbit-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/rabbit/SpringRabbitSingletons.java @@ -0,0 +1,38 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.spring.rabbit; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor; +import org.springframework.amqp.core.Message; + +public final class SpringRabbitSingletons { + + private static final String INSTRUMENTATION_NAME = "io.opentelemetry.javaagent.spring-rabbit-1.0"; + + private static final Instrumenter INSTRUMENTER; + + static { + SpringRabbitMessageAttributesExtractor attributesExtractor = + new SpringRabbitMessageAttributesExtractor(); + SpanNameExtractor spanNameExtractor = + MessagingSpanNameExtractor.create(attributesExtractor); + + INSTRUMENTER = + Instrumenter.newBuilder( + GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, spanNameExtractor) + .addAttributesExtractor(attributesExtractor) + .newConsumerInstrumenter(new MessageHeaderGetter()); + } + + public static Instrumenter instrumenter() { + return INSTRUMENTER; + } + + private SpringRabbitSingletons() {} +} diff --git a/instrumentation/spring/spring-rabbit-1.0/javaagent/src/test/groovy/ContextPropagationTest.groovy b/instrumentation/spring/spring-rabbit-1.0/javaagent/src/test/groovy/ContextPropagationTest.groovy new file mode 100644 index 0000000000..09cbb9dc5f --- /dev/null +++ b/instrumentation/spring/spring-rabbit-1.0/javaagent/src/test/groovy/ContextPropagationTest.groovy @@ -0,0 +1,162 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import static io.opentelemetry.api.trace.SpanKind.CLIENT +import static io.opentelemetry.api.trace.SpanKind.CONSUMER +import static io.opentelemetry.api.trace.SpanKind.PRODUCER +import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runInternalSpan +import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace + +import com.rabbitmq.client.ConnectionFactory +import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes +import java.time.Duration +import org.springframework.amqp.core.AmqpTemplate +import org.springframework.amqp.core.Queue +import org.springframework.amqp.rabbit.annotation.RabbitListener +import org.springframework.boot.SpringApplication +import org.springframework.boot.SpringBootConfiguration +import org.springframework.boot.autoconfigure.EnableAutoConfiguration +import org.springframework.context.ConfigurableApplicationContext +import org.springframework.context.annotation.Bean +import org.testcontainers.containers.GenericContainer +import spock.lang.Shared + +class ContextPropagationTest extends AgentInstrumentationSpecification { + + @Shared + GenericContainer rabbitMqContainer + @Shared + ConfigurableApplicationContext applicationContext + @Shared + ConnectionFactory connectionFactory + + def setupSpec() { + rabbitMqContainer = new GenericContainer('rabbitmq:latest') + .withExposedPorts(5672) + .withStartupTimeout(Duration.ofSeconds(120)) + rabbitMqContainer.start() + + def app = new SpringApplication(ConsumerConfig) + app.setDefaultProperties([ + "spring.jmx.enabled" : false, + "spring.main.web-application-type": "none", + "spring.rabbitmq.host" : rabbitMqContainer.containerIpAddress, + "spring.rabbitmq.port" : rabbitMqContainer.getMappedPort(5672), + ]) + applicationContext = app.run() + + connectionFactory = new ConnectionFactory( + host: rabbitMqContainer.containerIpAddress, + port: rabbitMqContainer.getMappedPort(5672) + ) + } + + def cleanupSpec() { + rabbitMqContainer?.stop() + applicationContext?.close() + } + + def "should propagate context to consumer"() { + given: + def connection = connectionFactory.newConnection() + def channel = connection.createChannel() + + when: + runUnderTrace("parent") { + applicationContext.getBean(AmqpTemplate) + .convertAndSend(ConsumerConfig.TEST_QUEUE, "test") + } + + then: + assertTraces(2) { + trace(0, 5) { + span(0) { + name "parent" + } + span(1) { + // created by rabbitmq instrumentation + name " -> testQueue send" + kind PRODUCER + childOf span(0) + attributes { + "${SemanticAttributes.NET_PEER_NAME.key}" "localhost" + "${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1" + "${SemanticAttributes.NET_PEER_PORT.key}" Long + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "rabbitmq" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" "" + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "queue" + "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long + } + } + span(2) { + // created by rabbitmq instrumentation + name "testQueue process" + kind CONSUMER + childOf span(1) + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "rabbitmq" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" "" + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "queue" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" + "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long + } + } + span(3) { + // created by spring-amqp instrumentation + name "testQueue process" + kind CONSUMER + childOf span(1) + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "rabbitmq" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" "testQueue" + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "queue" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" + "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long + } + } + span(4) { + name "consumer" + childOf span(3) + } + } + trace(1, 1) { + span(0) { + // created by rabbitmq instrumentation + name "basic.ack" + kind CLIENT + attributes { + "${SemanticAttributes.NET_PEER_NAME.key}" "localhost" + "${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1" + "${SemanticAttributes.NET_PEER_PORT.key}" Long + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "rabbitmq" + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "queue" + } + } + } + } + + cleanup: + channel?.close() + connection?.close() + } + + @SpringBootConfiguration + @EnableAutoConfiguration + static class ConsumerConfig { + + static final String TEST_QUEUE = "testQueue" + + @Bean + Queue testQueue() { + new Queue(TEST_QUEUE) + } + + @RabbitListener(queues = TEST_QUEUE) + void consume(String ignored) { + runInternalSpan("consumer") + } + } +} diff --git a/settings.gradle b/settings.gradle index fc90b63319..7424da2595 100644 --- a/settings.gradle +++ b/settings.gradle @@ -282,6 +282,7 @@ include ':instrumentation:spring:spring-data-1.8:javaagent' include ':instrumentation:spring:spring-integration-4.1:javaagent' include ':instrumentation:spring:spring-integration-4.1:library' include ':instrumentation:spring:spring-integration-4.1:testing' +include ':instrumentation:spring:spring-rabbit-1.0:javaagent' include ':instrumentation:spring:spring-scheduling-3.1:javaagent' include ':instrumentation:spring:spring-web-3.1:library' include ':instrumentation:spring:spring-webmvc-3.1:javaagent'