Add (more) Spring JMS support (#6308)
* Add (more) Spring JMS support * Remove duplication * Better advice matcher
This commit is contained in:
parent
a559effa01
commit
2a59d0faee
|
@ -18,37 +18,20 @@ muzzle {
|
|||
|
||||
testSets {
|
||||
create("jms2Test")
|
||||
create("jms2TestReceiveSpansDisabled") {
|
||||
extendsFrom("jms2Test")
|
||||
}
|
||||
}
|
||||
|
||||
tasks {
|
||||
val testReceiveSpansDisabled by registering(Test::class) {
|
||||
filter {
|
||||
includeTestsMatching("SpringListenerJms1SuppressReceiveSpansTest")
|
||||
}
|
||||
include("**/SpringListenerJms1SuppressReceiveSpansTest.*")
|
||||
}
|
||||
|
||||
val jms2Test by existing(Test::class) {
|
||||
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
|
||||
}
|
||||
|
||||
val jms2TestReceiveSpansDisabled by existing
|
||||
|
||||
test {
|
||||
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].getService())
|
||||
filter {
|
||||
excludeTestsMatching("SpringListenerJms1SuppressReceiveSpansTest")
|
||||
}
|
||||
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
|
||||
}
|
||||
|
||||
check {
|
||||
dependsOn(testReceiveSpansDisabled)
|
||||
dependsOn(jms2Test)
|
||||
dependsOn(jms2TestReceiveSpansDisabled)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -60,18 +43,11 @@ dependencies {
|
|||
|
||||
compileOnly("javax.jms:jms-api:1.1-rev-1")
|
||||
|
||||
testImplementation("javax.annotation:javax.annotation-api:1.3.2")
|
||||
testImplementation("org.springframework.boot:spring-boot-starter-activemq:${versions["org.springframework.boot"]}")
|
||||
testImplementation("org.springframework.boot:spring-boot-starter-test:${versions["org.springframework.boot"]}") {
|
||||
exclude("org.junit.vintage", "junit-vintage-engine")
|
||||
}
|
||||
testImplementation("org.apache.activemq:activemq-client:5.16.5")
|
||||
|
||||
add("jms2TestImplementation", "org.hornetq:hornetq-jms-client:2.4.7.Final")
|
||||
add("jms2TestImplementation", "org.hornetq:hornetq-jms-server:2.4.7.Final") {
|
||||
// this doesn't exist in maven central, and doesn't seem to be needed anyways
|
||||
exclude("org.jboss.naming", "jnpserver")
|
||||
}
|
||||
|
||||
// this is just to avoid a bit more copy-pasting
|
||||
add("jms2TestReceiveSpansDisabledImplementation", sourceSets["jms2Test"].output)
|
||||
}
|
||||
|
|
|
@ -1,43 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
|
||||
import listener.Config
|
||||
import org.springframework.context.annotation.AnnotationConfigApplicationContext
|
||||
import org.springframework.jms.core.JmsTemplate
|
||||
|
||||
import javax.jms.ConnectionFactory
|
||||
|
||||
import static Jms2Test.consumerSpan
|
||||
import static Jms2Test.producerSpan
|
||||
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
|
||||
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
|
||||
|
||||
class SpringListenerJms2Test extends AgentInstrumentationSpecification {
|
||||
def "receiving message in spring listener generates spans"() {
|
||||
setup:
|
||||
def context = new AnnotationConfigApplicationContext(Config)
|
||||
def factory = context.getBean(ConnectionFactory)
|
||||
def template = new JmsTemplate(factory)
|
||||
|
||||
template.convertAndSend("SpringListenerJms2", "a message")
|
||||
|
||||
expect:
|
||||
assertTraces(2) {
|
||||
traces.sort(orderByRootSpanKind(CONSUMER, PRODUCER))
|
||||
|
||||
trace(0, 1) {
|
||||
consumerSpan(it, 0, "queue", "SpringListenerJms2", "", null, "receive")
|
||||
}
|
||||
trace(1, 2) {
|
||||
producerSpan(it, 0, "queue", "SpringListenerJms2")
|
||||
consumerSpan(it, 1, "queue", "SpringListenerJms2", "", span(0), "process")
|
||||
}
|
||||
}
|
||||
|
||||
cleanup:
|
||||
context.close()
|
||||
}
|
||||
}
|
|
@ -1,18 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package listener
|
||||
|
||||
import org.springframework.jms.annotation.JmsListener
|
||||
import org.springframework.stereotype.Component
|
||||
|
||||
@Component
|
||||
class TestListener {
|
||||
|
||||
@JmsListener(destination = "SpringListenerJms2", containerFactory = "containerFactory")
|
||||
void receiveMessage(String message) {
|
||||
println "received: " + message
|
||||
}
|
||||
}
|
|
@ -12,7 +12,8 @@ import java.util.logging.Logger;
|
|||
import javax.annotation.Nullable;
|
||||
import javax.jms.JMSException;
|
||||
|
||||
enum JmsMessageAttributesGetter implements MessagingAttributesGetter<MessageWithDestination, Void> {
|
||||
public enum JmsMessageAttributesGetter
|
||||
implements MessagingAttributesGetter<MessageWithDestination, Void> {
|
||||
INSTANCE;
|
||||
|
||||
private static final Logger logger = Logger.getLogger(JmsMessageAttributesGetter.class.getName());
|
||||
|
|
|
@ -9,7 +9,7 @@ import io.opentelemetry.context.propagation.TextMapGetter;
|
|||
import java.util.Collections;
|
||||
import javax.jms.JMSException;
|
||||
|
||||
enum MessagePropertyGetter implements TextMapGetter<MessageWithDestination> {
|
||||
public enum MessagePropertyGetter implements TextMapGetter<MessageWithDestination> {
|
||||
INSTANCE;
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1,37 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
|
||||
import listener.Config
|
||||
import org.springframework.context.annotation.AnnotationConfigApplicationContext
|
||||
import org.springframework.jms.core.JmsTemplate
|
||||
|
||||
import javax.jms.ConnectionFactory
|
||||
|
||||
import static Jms1Test.consumerSpan
|
||||
import static Jms1Test.producerSpan
|
||||
|
||||
class SpringListenerJms1SuppressReceiveSpansTest extends AgentInstrumentationSpecification {
|
||||
|
||||
def "receiving message in spring listener generates spans"() {
|
||||
setup:
|
||||
def context = new AnnotationConfigApplicationContext(Config)
|
||||
def factory = context.getBean(ConnectionFactory)
|
||||
def template = new JmsTemplate(factory)
|
||||
|
||||
template.convertAndSend("SpringListenerJms1", "a message")
|
||||
|
||||
expect:
|
||||
assertTraces(1) {
|
||||
trace(0, 2) {
|
||||
producerSpan(it, 0, "queue", "SpringListenerJms1")
|
||||
consumerSpan(it, 1, "queue", "SpringListenerJms1", "", span(0), "process")
|
||||
}
|
||||
}
|
||||
|
||||
cleanup:
|
||||
context.stop()
|
||||
}
|
||||
}
|
|
@ -1,44 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
|
||||
import listener.Config
|
||||
import org.springframework.context.annotation.AnnotationConfigApplicationContext
|
||||
import org.springframework.jms.core.JmsTemplate
|
||||
|
||||
import javax.jms.ConnectionFactory
|
||||
|
||||
import static Jms1Test.consumerSpan
|
||||
import static Jms1Test.producerSpan
|
||||
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
|
||||
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
|
||||
|
||||
class SpringListenerJms1Test extends AgentInstrumentationSpecification {
|
||||
|
||||
def "receiving message in spring listener generates spans"() {
|
||||
setup:
|
||||
def context = new AnnotationConfigApplicationContext(Config)
|
||||
def factory = context.getBean(ConnectionFactory)
|
||||
def template = new JmsTemplate(factory)
|
||||
|
||||
template.convertAndSend("SpringListenerJms1", "a message")
|
||||
|
||||
expect:
|
||||
assertTraces(2) {
|
||||
traces.sort(orderByRootSpanKind(CONSUMER, PRODUCER))
|
||||
|
||||
trace(0, 1) {
|
||||
consumerSpan(it, 0, "queue", "SpringListenerJms1", "", null, "receive")
|
||||
}
|
||||
trace(1, 2) {
|
||||
producerSpan(it, 0, "queue", "SpringListenerJms1")
|
||||
consumerSpan(it, 1, "queue", "SpringListenerJms1", "", span(0), "process")
|
||||
}
|
||||
}
|
||||
|
||||
cleanup:
|
||||
context.stop()
|
||||
}
|
||||
}
|
|
@ -1,128 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
|
||||
import org.apache.activemq.ActiveMQConnectionFactory
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
import org.springframework.jms.core.JmsTemplate
|
||||
import org.testcontainers.containers.GenericContainer
|
||||
import org.testcontainers.containers.output.Slf4jLogConsumer
|
||||
import spock.lang.Shared
|
||||
|
||||
import javax.jms.Connection
|
||||
import javax.jms.Session
|
||||
import javax.jms.TextMessage
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
|
||||
import static Jms1Test.consumerSpan
|
||||
import static Jms1Test.producerSpan
|
||||
|
||||
class SpringTemplateJms1Test extends AgentInstrumentationSpecification {
|
||||
private static final Logger logger = LoggerFactory.getLogger("io.opentelemetry.SpringTemplateJms1Test")
|
||||
|
||||
private static final GenericContainer broker = new GenericContainer("rmohr/activemq:latest")
|
||||
.withExposedPorts(61616, 8161)
|
||||
.withLogConsumer(new Slf4jLogConsumer(logger))
|
||||
|
||||
@Shared
|
||||
String messageText = "a message"
|
||||
@Shared
|
||||
JmsTemplate template
|
||||
@Shared
|
||||
Session session
|
||||
|
||||
def setupSpec() {
|
||||
broker.start()
|
||||
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:" + broker.getMappedPort(61616))
|
||||
// to avoid InvalidDestinationException in "send and receive message generates spans"
|
||||
// see https://issues.apache.org/jira/browse/AMQ-6155
|
||||
connectionFactory.setWatchTopicAdvisories(false)
|
||||
Connection connection = connectionFactory.createConnection()
|
||||
connection.start()
|
||||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
|
||||
|
||||
template = new JmsTemplate(connectionFactory)
|
||||
// Make this longer than timeout on testWriter.waitForTraces
|
||||
// Otherwise caller might give up waiting before callee has a chance to respond.
|
||||
template.receiveTimeout = TimeUnit.SECONDS.toMillis(21)
|
||||
}
|
||||
|
||||
def cleanupSpec() {
|
||||
broker.stop()
|
||||
}
|
||||
|
||||
def "sending a message to #destinationName generates spans"() {
|
||||
setup:
|
||||
template.convertAndSend(destination, messageText)
|
||||
TextMessage receivedMessage = template.receive(destination)
|
||||
|
||||
expect:
|
||||
receivedMessage.text == messageText
|
||||
assertTraces(2) {
|
||||
trace(0, 1) {
|
||||
producerSpan(it, 0, destinationType, destinationName)
|
||||
}
|
||||
trace(1, 1) {
|
||||
consumerSpan(it, 0, destinationType, destinationName, receivedMessage.getJMSMessageID(), null, "receive")
|
||||
}
|
||||
}
|
||||
|
||||
where:
|
||||
destination | destinationType | destinationName
|
||||
session.createQueue("SpringTemplateJms1") | "queue" | "SpringTemplateJms1"
|
||||
}
|
||||
|
||||
def "send and receive message generates spans"() {
|
||||
setup:
|
||||
AtomicReference<String> msgId = new AtomicReference<>()
|
||||
Thread.start {
|
||||
logger.info("calling receive")
|
||||
TextMessage msg = template.receive(destination)
|
||||
assert msg.text == messageText
|
||||
msgId.set(msg.getJMSMessageID())
|
||||
|
||||
logger.info("calling send")
|
||||
template.send(msg.getJMSReplyTo()) {
|
||||
session -> template.getMessageConverter().toMessage("responded!", session)
|
||||
}
|
||||
}
|
||||
logger.info("calling sendAndReceive")
|
||||
def receivedMessage = template.sendAndReceive(destination) {
|
||||
session -> template.getMessageConverter().toMessage(messageText, session)
|
||||
}
|
||||
logger.info("received message " + receivedMessage)
|
||||
|
||||
expect:
|
||||
receivedMessage != null
|
||||
receivedMessage.text == "responded!"
|
||||
assertTraces(4) {
|
||||
traces.sort(orderByRootSpanName(
|
||||
"$destinationName receive",
|
||||
"$destinationName send",
|
||||
"(temporary) receive",
|
||||
"(temporary) send"))
|
||||
|
||||
trace(0, 1) {
|
||||
consumerSpan(it, 0, destinationType, destinationName, msgId.get(), null, "receive")
|
||||
}
|
||||
trace(1, 1) {
|
||||
producerSpan(it, 0, destinationType, destinationName)
|
||||
}
|
||||
trace(2, 1) {
|
||||
consumerSpan(it, 0, "queue", "(temporary)", receivedMessage.getJMSMessageID(), null, "receive")
|
||||
}
|
||||
trace(3, 1) {
|
||||
// receive doesn't propagate the trace, so this is a root
|
||||
producerSpan(it, 0, "queue", "(temporary)")
|
||||
}
|
||||
}
|
||||
|
||||
where:
|
||||
destination | destinationType | destinationName
|
||||
session.createQueue("SpringTemplateJms1") | "queue" | "SpringTemplateJms1"
|
||||
}
|
||||
}
|
|
@ -1,52 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package listener
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory
|
||||
import org.springframework.context.annotation.Bean
|
||||
import org.springframework.context.annotation.ComponentScan
|
||||
import org.springframework.context.annotation.Configuration
|
||||
import org.springframework.jms.annotation.EnableJms
|
||||
import org.springframework.jms.config.DefaultJmsListenerContainerFactory
|
||||
import org.springframework.jms.config.JmsListenerContainerFactory
|
||||
import org.testcontainers.containers.GenericContainer
|
||||
import org.testcontainers.containers.wait.strategy.Wait
|
||||
|
||||
import javax.annotation.PreDestroy
|
||||
import javax.jms.ConnectionFactory
|
||||
import java.time.Duration
|
||||
|
||||
@Configuration
|
||||
@ComponentScan
|
||||
@EnableJms
|
||||
class Config {
|
||||
|
||||
private static GenericContainer broker = new GenericContainer("rmohr/activemq:latest")
|
||||
.withExposedPorts(61616, 8161)
|
||||
.waitingFor(Wait.forLogMessage(".*Apache ActiveMQ .* started.*", 1))
|
||||
.withStartupTimeout(Duration.ofMinutes(2))
|
||||
|
||||
static {
|
||||
broker.start()
|
||||
}
|
||||
|
||||
@Bean
|
||||
ConnectionFactory connectionFactory() {
|
||||
return new ActiveMQConnectionFactory("tcp://localhost:" + broker.getMappedPort(61616))
|
||||
}
|
||||
|
||||
@Bean
|
||||
JmsListenerContainerFactory<?> containerFactory(ConnectionFactory connectionFactory) {
|
||||
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory()
|
||||
factory.setConnectionFactory(connectionFactory)
|
||||
return factory
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
void destroy() {
|
||||
broker.stop()
|
||||
}
|
||||
}
|
|
@ -0,0 +1,58 @@
|
|||
plugins {
|
||||
id("otel.javaagent-instrumentation")
|
||||
id("org.unbroken-dome.test-sets")
|
||||
}
|
||||
|
||||
muzzle {
|
||||
pass {
|
||||
group.set("org.springframework")
|
||||
module.set("spring-jms")
|
||||
versions.set("[2.0,)")
|
||||
extraDependency("javax.jms:jms-api:1.1-rev-1")
|
||||
assertInverse.set(true)
|
||||
}
|
||||
}
|
||||
|
||||
testSets {
|
||||
create("testReceiveSpansDisabled")
|
||||
}
|
||||
|
||||
tasks {
|
||||
test {
|
||||
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].getService())
|
||||
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
|
||||
}
|
||||
|
||||
val testReceiveSpansDisabled by existing
|
||||
|
||||
check {
|
||||
dependsOn(testReceiveSpansDisabled)
|
||||
}
|
||||
}
|
||||
|
||||
val versions: Map<String, String> by project
|
||||
|
||||
dependencies {
|
||||
implementation(project(":instrumentation:jms-1.1:javaagent"))
|
||||
library("org.springframework:spring-jms:2.0")
|
||||
compileOnly("javax.jms:jms-api:1.1-rev-1")
|
||||
|
||||
compileOnly("com.google.auto.value:auto-value-annotations")
|
||||
annotationProcessor("com.google.auto.value:auto-value")
|
||||
|
||||
testInstrumentation(project(":instrumentation:jms-1.1:javaagent"))
|
||||
|
||||
testImplementation("org.springframework.boot:spring-boot-starter-activemq:${versions["org.springframework.boot"]}")
|
||||
testImplementation("org.springframework.boot:spring-boot-starter-test:${versions["org.springframework.boot"]}") {
|
||||
exclude("org.junit.vintage", "junit-vintage-engine")
|
||||
}
|
||||
|
||||
testImplementation("org.hornetq:hornetq-jms-client:2.4.7.Final")
|
||||
testImplementation("org.hornetq:hornetq-jms-server:2.4.7.Final") {
|
||||
// this doesn't exist in maven central, and doesn't seem to be needed anyways
|
||||
exclude("org.jboss.naming", "jnpserver")
|
||||
}
|
||||
|
||||
// this is just to avoid a bit more copy-pasting
|
||||
add("testReceiveSpansDisabledImplementation", sourceSets["test"].output)
|
||||
}
|
|
@ -0,0 +1,25 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.spring.jms;
|
||||
|
||||
import static java.util.Collections.singletonList;
|
||||
|
||||
import com.google.auto.service.AutoService;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
|
||||
import java.util.List;
|
||||
|
||||
@AutoService(InstrumentationModule.class)
|
||||
public class SpringJmsInstrumentationModule extends InstrumentationModule {
|
||||
public SpringJmsInstrumentationModule() {
|
||||
super("spring-jms", "spring-jms-2.0");
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<TypeInstrumentation> typeInstrumentations() {
|
||||
return singletonList(new SpringJmsMessageListenerInstrumentation());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,84 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.spring.jms;
|
||||
|
||||
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
|
||||
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface;
|
||||
import static io.opentelemetry.javaagent.instrumentation.spring.jms.SpringJmsSingletons.listenerInstrumenter;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
|
||||
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
|
||||
import io.opentelemetry.javaagent.instrumentation.jms.MessageWithDestination;
|
||||
import javax.jms.Message;
|
||||
import net.bytebuddy.asm.Advice;
|
||||
import net.bytebuddy.description.type.TypeDescription;
|
||||
import net.bytebuddy.matcher.ElementMatcher;
|
||||
|
||||
public class SpringJmsMessageListenerInstrumentation implements TypeInstrumentation {
|
||||
|
||||
@Override
|
||||
public ElementMatcher<ClassLoader> classLoaderOptimization() {
|
||||
return hasClassesNamed("org.springframework.jms.listener.SessionAwareMessageListener");
|
||||
}
|
||||
|
||||
@Override
|
||||
public ElementMatcher<TypeDescription> typeMatcher() {
|
||||
return implementsInterface(
|
||||
named("org.springframework.jms.listener.SessionAwareMessageListener"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void transform(TypeTransformer transformer) {
|
||||
transformer.applyAdviceToMethod(
|
||||
named("onMessage")
|
||||
.and(isPublic())
|
||||
.and(takesArguments(2))
|
||||
.and(takesArgument(0, named("javax.jms.Message"))),
|
||||
SpringJmsMessageListenerInstrumentation.class.getName() + "$MessageListenerAdvice");
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public static class MessageListenerAdvice {
|
||||
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
public static void onEnter(
|
||||
@Advice.Argument(0) Message message,
|
||||
@Advice.Local("otelRequest") MessageWithDestination request,
|
||||
@Advice.Local("otelContext") Context context,
|
||||
@Advice.Local("otelScope") Scope scope) {
|
||||
|
||||
Context parentContext = Java8BytecodeBridge.currentContext();
|
||||
request = MessageWithDestination.create(message, null);
|
||||
|
||||
if (!listenerInstrumenter().shouldStart(parentContext, request)) {
|
||||
return;
|
||||
}
|
||||
|
||||
context = listenerInstrumenter().start(parentContext, request);
|
||||
scope = context.makeCurrent();
|
||||
}
|
||||
|
||||
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
|
||||
public static void stopSpan(
|
||||
@Advice.Local("otelRequest") MessageWithDestination request,
|
||||
@Advice.Local("otelContext") Context context,
|
||||
@Advice.Local("otelScope") Scope scope,
|
||||
@Advice.Thrown Throwable throwable) {
|
||||
if (scope == null) {
|
||||
return;
|
||||
}
|
||||
scope.close();
|
||||
listenerInstrumenter().end(context, request, null, throwable);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,40 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.spring.jms;
|
||||
|
||||
import io.opentelemetry.api.GlobalOpenTelemetry;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
|
||||
import io.opentelemetry.javaagent.instrumentation.jms.JmsMessageAttributesGetter;
|
||||
import io.opentelemetry.javaagent.instrumentation.jms.MessagePropertyGetter;
|
||||
import io.opentelemetry.javaagent.instrumentation.jms.MessageWithDestination;
|
||||
|
||||
public final class SpringJmsSingletons {
|
||||
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.spring-jms-2.0";
|
||||
|
||||
private static final Instrumenter<MessageWithDestination, Void> LISTENER_INSTRUMENTER =
|
||||
buildListenerInstrumenter();
|
||||
|
||||
private static Instrumenter<MessageWithDestination, Void> buildListenerInstrumenter() {
|
||||
JmsMessageAttributesGetter getter = JmsMessageAttributesGetter.INSTANCE;
|
||||
MessageOperation operation = MessageOperation.PROCESS;
|
||||
|
||||
return Instrumenter.<MessageWithDestination, Void>builder(
|
||||
GlobalOpenTelemetry.get(),
|
||||
INSTRUMENTATION_NAME,
|
||||
MessagingSpanNameExtractor.create(getter, operation))
|
||||
.addAttributesExtractor(MessagingAttributesExtractor.create(getter, operation))
|
||||
.newConsumerInstrumenter(MessagePropertyGetter.INSTANCE);
|
||||
}
|
||||
|
||||
public static Instrumenter<MessageWithDestination, Void> listenerInstrumenter() {
|
||||
return LISTENER_INSTRUMENTER;
|
||||
}
|
||||
|
||||
private SpringJmsSingletons() {}
|
||||
}
|
|
@ -0,0 +1,93 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
|
||||
import io.opentelemetry.instrumentation.test.asserts.TraceAssert
|
||||
import io.opentelemetry.sdk.trace.data.SpanData
|
||||
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
|
||||
import listener.AnnotatedListenerConfig
|
||||
import listener.ManualListenerConfig
|
||||
import org.springframework.context.annotation.AnnotationConfigApplicationContext
|
||||
import org.springframework.jms.core.JmsTemplate
|
||||
|
||||
import javax.jms.ConnectionFactory
|
||||
|
||||
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
|
||||
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
|
||||
|
||||
class SpringListenerTest extends AgentInstrumentationSpecification {
|
||||
def "receiving message in spring listener generates spans"() {
|
||||
setup:
|
||||
def context = new AnnotationConfigApplicationContext(config)
|
||||
def factory = context.getBean(ConnectionFactory)
|
||||
def template = new JmsTemplate(factory)
|
||||
|
||||
template.convertAndSend("SpringListenerJms2", "a message")
|
||||
|
||||
expect:
|
||||
assertTraces(2) {
|
||||
traces.sort(orderByRootSpanKind(CONSUMER, PRODUCER))
|
||||
|
||||
trace(0, 1) {
|
||||
consumerSpan(it, 0, "queue", "SpringListenerJms2", "", null, "receive")
|
||||
}
|
||||
trace(1, 2) {
|
||||
producerSpan(it, 0, "queue", "SpringListenerJms2")
|
||||
consumerSpan(it, 1, "queue", "SpringListenerJms2", "", span(0), "process")
|
||||
}
|
||||
}
|
||||
|
||||
cleanup:
|
||||
context.close()
|
||||
|
||||
where:
|
||||
config << [AnnotatedListenerConfig, ManualListenerConfig]
|
||||
}
|
||||
|
||||
static producerSpan(TraceAssert trace, int index, String destinationType, String destinationName) {
|
||||
trace.span(index) {
|
||||
name destinationName + " send"
|
||||
kind PRODUCER
|
||||
hasNoParent()
|
||||
attributes {
|
||||
"$SemanticAttributes.MESSAGING_SYSTEM" "jms"
|
||||
"$SemanticAttributes.MESSAGING_DESTINATION" destinationName
|
||||
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" destinationType
|
||||
if (destinationName == "(temporary)") {
|
||||
"$SemanticAttributes.MESSAGING_TEMP_DESTINATION" true
|
||||
}
|
||||
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// passing messageId = null will verify message.id is not captured,
|
||||
// passing messageId = "" will verify message.id is captured (but won't verify anything about the value),
|
||||
// any other value for messageId will verify that message.id is captured and has that same value
|
||||
static consumerSpan(TraceAssert trace, int index, String destinationType, String destinationName, String messageId, Object parentOrLinkedSpan, String operation) {
|
||||
trace.span(index) {
|
||||
name destinationName + " " + operation
|
||||
kind CONSUMER
|
||||
if (parentOrLinkedSpan != null) {
|
||||
childOf((SpanData) parentOrLinkedSpan)
|
||||
} else {
|
||||
hasNoParent()
|
||||
}
|
||||
attributes {
|
||||
"$SemanticAttributes.MESSAGING_SYSTEM" "jms"
|
||||
"$SemanticAttributes.MESSAGING_DESTINATION" destinationName
|
||||
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" destinationType
|
||||
"$SemanticAttributes.MESSAGING_OPERATION" operation
|
||||
if (messageId != null) {
|
||||
//In some tests we don't know exact messageId, so we pass "" and verify just the existence of the attribute
|
||||
"$SemanticAttributes.MESSAGING_MESSAGE_ID" { it == messageId || messageId == "" }
|
||||
}
|
||||
if (destinationName == "(temporary)") {
|
||||
"$SemanticAttributes.MESSAGING_TEMP_DESTINATION" true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -5,7 +5,6 @@
|
|||
|
||||
import com.google.common.io.Files
|
||||
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
|
||||
import javax.jms.Connection
|
||||
import org.hornetq.api.core.TransportConfiguration
|
||||
import org.hornetq.api.core.client.HornetQClient
|
||||
import org.hornetq.api.jms.HornetQJMSClient
|
||||
|
@ -20,15 +19,16 @@ import org.hornetq.core.server.HornetQServers
|
|||
import org.springframework.jms.core.JmsTemplate
|
||||
import spock.lang.Shared
|
||||
|
||||
import javax.jms.Connection
|
||||
import javax.jms.Session
|
||||
import javax.jms.TextMessage
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
|
||||
import static Jms2Test.consumerSpan
|
||||
import static Jms2Test.producerSpan
|
||||
import static SpringListenerTest.consumerSpan
|
||||
import static SpringListenerTest.producerSpan
|
||||
|
||||
class SpringTemplateJms2Test extends AgentInstrumentationSpecification {
|
||||
class SpringTemplateTest extends AgentInstrumentationSpecification {
|
||||
@Shared
|
||||
HornetQServer server
|
||||
@Shared
|
|
@ -17,19 +17,13 @@ import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory
|
|||
import org.hornetq.core.server.HornetQServer
|
||||
import org.hornetq.core.server.HornetQServers
|
||||
import org.springframework.context.annotation.Bean
|
||||
import org.springframework.context.annotation.ComponentScan
|
||||
import org.springframework.context.annotation.Configuration
|
||||
import org.springframework.jms.annotation.EnableJms
|
||||
import org.springframework.jms.config.DefaultJmsListenerContainerFactory
|
||||
import org.springframework.jms.config.JmsListenerContainerFactory
|
||||
|
||||
import javax.annotation.PreDestroy
|
||||
import javax.jms.ConnectionFactory
|
||||
|
||||
@Configuration
|
||||
@ComponentScan
|
||||
@EnableJms
|
||||
class Config {
|
||||
class AbstractConfig {
|
||||
|
||||
private HornetQServer server
|
||||
|
||||
|
@ -64,7 +58,7 @@ class Config {
|
|||
}
|
||||
|
||||
@Bean
|
||||
JmsListenerContainerFactory<?> containerFactory(ConnectionFactory connectionFactory) {
|
||||
JmsListenerContainerFactory<?> jmsListenerContainerFactory(ConnectionFactory connectionFactory) {
|
||||
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory()
|
||||
factory.setConnectionFactory(connectionFactory)
|
||||
return factory
|
|
@ -0,0 +1,14 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package listener
|
||||
|
||||
import org.springframework.context.annotation.ComponentScan
|
||||
import org.springframework.jms.annotation.EnableJms
|
||||
|
||||
@ComponentScan
|
||||
@EnableJms
|
||||
class AnnotatedListenerConfig extends AbstractConfig {
|
||||
}
|
|
@ -0,0 +1,45 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package listener
|
||||
|
||||
|
||||
import org.springframework.jms.annotation.EnableJms
|
||||
import org.springframework.jms.annotation.JmsListenerConfigurer
|
||||
import org.springframework.jms.config.JmsListenerEndpoint
|
||||
import org.springframework.jms.config.JmsListenerEndpointRegistrar
|
||||
import org.springframework.jms.listener.AbstractMessageListenerContainer
|
||||
import org.springframework.jms.listener.MessageListenerContainer
|
||||
import org.springframework.jms.listener.SessionAwareMessageListener
|
||||
|
||||
import javax.jms.JMSException
|
||||
import javax.jms.Message
|
||||
import javax.jms.Session
|
||||
|
||||
@EnableJms
|
||||
class ManualListenerConfig extends AbstractConfig implements JmsListenerConfigurer {
|
||||
|
||||
@Override
|
||||
void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
|
||||
registrar.registerEndpoint(new JmsListenerEndpoint() {
|
||||
@Override
|
||||
String getId() {
|
||||
return "testid"
|
||||
}
|
||||
|
||||
@Override
|
||||
void setupListenerContainer(MessageListenerContainer listenerContainer) {
|
||||
var container = (AbstractMessageListenerContainer) listenerContainer
|
||||
container.setDestinationName("SpringListenerJms2")
|
||||
container.setupMessageListener(new SessionAwareMessageListener<Message>() {
|
||||
@Override
|
||||
void onMessage(Message message, Session session) throws JMSException {
|
||||
println "received: " + message
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
|
@ -11,7 +11,7 @@ import org.springframework.stereotype.Component
|
|||
@Component
|
||||
class TestListener {
|
||||
|
||||
@JmsListener(destination = "SpringListenerJms1", containerFactory = "containerFactory")
|
||||
@JmsListener(destination = "SpringListenerJms2")
|
||||
void receiveMessage(String message) {
|
||||
println "received: " + message
|
||||
}
|
|
@ -4,16 +4,16 @@
|
|||
*/
|
||||
|
||||
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
|
||||
import listener.Config
|
||||
import listener.AnnotatedListenerConfig
|
||||
import org.springframework.context.annotation.AnnotationConfigApplicationContext
|
||||
import org.springframework.jms.core.JmsTemplate
|
||||
|
||||
import javax.jms.ConnectionFactory
|
||||
|
||||
class SpringListenerJms2SuppressReceiveSpansTest extends AgentInstrumentationSpecification {
|
||||
class SpringListenerSuppressReceiveSpansTest extends AgentInstrumentationSpecification {
|
||||
def "receiving message in spring listener generates spans"() {
|
||||
setup:
|
||||
def context = new AnnotationConfigApplicationContext(Config)
|
||||
def context = new AnnotationConfigApplicationContext(AnnotatedListenerConfig)
|
||||
def factory = context.getBean(ConnectionFactory)
|
||||
def template = new JmsTemplate(factory)
|
||||
|
||||
|
@ -22,8 +22,8 @@ class SpringListenerJms2SuppressReceiveSpansTest extends AgentInstrumentationSpe
|
|||
expect:
|
||||
assertTraces(1) {
|
||||
trace(0, 2) {
|
||||
Jms2Test.producerSpan(it, 0, "queue", "SpringListenerJms2")
|
||||
Jms2Test.consumerSpan(it, 1, "queue", "SpringListenerJms2", "", span(0), "process")
|
||||
SpringListenerTest.producerSpan(it, 0, "queue", "SpringListenerJms2")
|
||||
SpringListenerTest.consumerSpan(it, 1, "queue", "SpringListenerJms2", "", span(0), "process")
|
||||
}
|
||||
}
|
||||
|
|
@ -427,6 +427,7 @@ include(":instrumentation:spring:spring-data-1.8:javaagent")
|
|||
include(":instrumentation:spring:spring-integration-4.1:javaagent")
|
||||
include(":instrumentation:spring:spring-integration-4.1:library")
|
||||
include(":instrumentation:spring:spring-integration-4.1:testing")
|
||||
include(":instrumentation:spring:spring-jms-2.0:javaagent")
|
||||
include(":instrumentation:spring:spring-kafka-2.7:javaagent")
|
||||
include(":instrumentation:spring:spring-kafka-2.7:testing")
|
||||
include(":instrumentation:spring:spring-rabbit-1.0:javaagent")
|
||||
|
|
Loading…
Reference in New Issue