Support Spring JMS 6.0 (#7438)
Part of #7203 The instrumentation is 100% copy-pasted, tests are rewritten from scratch in Java because of way too many class name changes. Depends on https://github.com/open-telemetry/opentelemetry-java-instrumentation/pull/7418 Co-authored-by: Trask Stalnaker <trask.stalnaker@gmail.com>
This commit is contained in:
parent
18cffb7aeb
commit
fb5cf9fd97
|
@ -7,8 +7,9 @@ muzzle {
|
|||
pass {
|
||||
group.set("org.springframework")
|
||||
module.set("spring-jms")
|
||||
versions.set("[2.0,)")
|
||||
versions.set("[2.0,6)")
|
||||
extraDependency("javax.jms:jms-api:1.1-rev-1")
|
||||
excludeInstrumentationName("jms")
|
||||
assertInverse.set(true)
|
||||
}
|
||||
}
|
|
@ -3,21 +3,30 @@
|
|||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.spring.jms;
|
||||
package io.opentelemetry.javaagent.instrumentation.spring.jms.v2_0;
|
||||
|
||||
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
|
||||
import static java.util.Collections.singletonList;
|
||||
|
||||
import com.google.auto.service.AutoService;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
|
||||
import java.util.List;
|
||||
import net.bytebuddy.matcher.ElementMatcher;
|
||||
|
||||
@AutoService(InstrumentationModule.class)
|
||||
public class SpringJmsInstrumentationModule extends InstrumentationModule {
|
||||
|
||||
public SpringJmsInstrumentationModule() {
|
||||
super("spring-jms", "spring-jms-2.0");
|
||||
}
|
||||
|
||||
@Override
|
||||
public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
|
||||
// introduced in 2.0, removed in 6.0
|
||||
return hasClassesNamed("org.springframework.jms.remoting.JmsInvokerProxyFactoryBean");
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<TypeInstrumentation> typeInstrumentations() {
|
||||
return singletonList(new SpringJmsMessageListenerInstrumentation());
|
|
@ -3,11 +3,11 @@
|
|||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.spring.jms;
|
||||
package io.opentelemetry.javaagent.instrumentation.spring.jms.v2_0;
|
||||
|
||||
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
|
||||
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface;
|
||||
import static io.opentelemetry.javaagent.instrumentation.spring.jms.SpringJmsSingletons.listenerInstrumenter;
|
||||
import static io.opentelemetry.javaagent.instrumentation.spring.jms.v2_0.SpringJmsSingletons.listenerInstrumenter;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
|
|
@ -3,7 +3,7 @@
|
|||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.spring.jms;
|
||||
package io.opentelemetry.javaagent.instrumentation.spring.jms.v2_0;
|
||||
|
||||
import io.opentelemetry.api.GlobalOpenTelemetry;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
|
|
@ -0,0 +1,42 @@
|
|||
plugins {
|
||||
id("otel.javaagent-instrumentation")
|
||||
}
|
||||
|
||||
muzzle {
|
||||
pass {
|
||||
group.set("org.springframework")
|
||||
module.set("spring-jms")
|
||||
versions.set("[6.0.0,)")
|
||||
extraDependency("jakarta.jms:jakarta.jms-api:3.0.0")
|
||||
excludeInstrumentationName("jms")
|
||||
assertInverse.set(true)
|
||||
}
|
||||
}
|
||||
|
||||
dependencies {
|
||||
implementation(project(":instrumentation:jms:jms-common:javaagent"))
|
||||
implementation(project(":instrumentation:jms:jms-3.0:javaagent"))
|
||||
|
||||
library("org.springframework:spring-jms:6.0.0")
|
||||
compileOnly("jakarta.jms:jakarta.jms-api:3.0.0")
|
||||
|
||||
testInstrumentation(project(":instrumentation:jms:jms-3.0:javaagent"))
|
||||
|
||||
testImplementation("org.apache.activemq:artemis-jakarta-client:2.27.1")
|
||||
|
||||
testLibrary("org.springframework.boot:spring-boot-starter-test:3.0.0")
|
||||
testLibrary("org.springframework.boot:spring-boot-starter:3.0.0")
|
||||
}
|
||||
|
||||
// spring 6 requires java 17
|
||||
otelJava {
|
||||
minJavaVersionSupported.set(JavaVersion.VERSION_17)
|
||||
}
|
||||
|
||||
tasks {
|
||||
test {
|
||||
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
|
||||
|
||||
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
|
||||
}
|
||||
}
|
|
@ -0,0 +1,35 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.spring.jms.v6_0;
|
||||
|
||||
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
|
||||
import static java.util.Collections.singletonList;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.not;
|
||||
|
||||
import com.google.auto.service.AutoService;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
|
||||
import java.util.List;
|
||||
import net.bytebuddy.matcher.ElementMatcher;
|
||||
|
||||
@AutoService(InstrumentationModule.class)
|
||||
public class SpringJmsInstrumentationModule extends InstrumentationModule {
|
||||
|
||||
public SpringJmsInstrumentationModule() {
|
||||
super("spring-jms", "spring-jms-6.0");
|
||||
}
|
||||
|
||||
@Override
|
||||
public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
|
||||
// removed in 6.0
|
||||
return not(hasClassesNamed("org.springframework.jms.remoting.JmsInvokerProxyFactoryBean"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<TypeInstrumentation> typeInstrumentations() {
|
||||
return singletonList(new SpringJmsMessageListenerInstrumentation());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,85 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.spring.jms.v6_0;
|
||||
|
||||
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
|
||||
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface;
|
||||
import static io.opentelemetry.javaagent.instrumentation.spring.jms.v6_0.SpringJmsSingletons.listenerInstrumenter;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
|
||||
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
|
||||
import io.opentelemetry.javaagent.instrumentation.jms.MessageWithDestination;
|
||||
import io.opentelemetry.javaagent.instrumentation.jms.v3_0.JakartaMessageAdapter;
|
||||
import jakarta.jms.Message;
|
||||
import net.bytebuddy.asm.Advice;
|
||||
import net.bytebuddy.description.type.TypeDescription;
|
||||
import net.bytebuddy.matcher.ElementMatcher;
|
||||
|
||||
public class SpringJmsMessageListenerInstrumentation implements TypeInstrumentation {
|
||||
|
||||
@Override
|
||||
public ElementMatcher<ClassLoader> classLoaderOptimization() {
|
||||
return hasClassesNamed("org.springframework.jms.listener.SessionAwareMessageListener");
|
||||
}
|
||||
|
||||
@Override
|
||||
public ElementMatcher<TypeDescription> typeMatcher() {
|
||||
return implementsInterface(
|
||||
named("org.springframework.jms.listener.SessionAwareMessageListener"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void transform(TypeTransformer transformer) {
|
||||
transformer.applyAdviceToMethod(
|
||||
named("onMessage")
|
||||
.and(isPublic())
|
||||
.and(takesArguments(2))
|
||||
.and(takesArgument(0, named("jakarta.jms.Message"))),
|
||||
SpringJmsMessageListenerInstrumentation.class.getName() + "$MessageListenerAdvice");
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public static class MessageListenerAdvice {
|
||||
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
public static void onEnter(
|
||||
@Advice.Argument(0) Message message,
|
||||
@Advice.Local("otelRequest") MessageWithDestination request,
|
||||
@Advice.Local("otelContext") Context context,
|
||||
@Advice.Local("otelScope") Scope scope) {
|
||||
|
||||
Context parentContext = Java8BytecodeBridge.currentContext();
|
||||
request = MessageWithDestination.create(JakartaMessageAdapter.create(message), null);
|
||||
|
||||
if (!listenerInstrumenter().shouldStart(parentContext, request)) {
|
||||
return;
|
||||
}
|
||||
|
||||
context = listenerInstrumenter().start(parentContext, request);
|
||||
scope = context.makeCurrent();
|
||||
}
|
||||
|
||||
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
|
||||
public static void stopSpan(
|
||||
@Advice.Local("otelRequest") MessageWithDestination request,
|
||||
@Advice.Local("otelContext") Context context,
|
||||
@Advice.Local("otelScope") Scope scope,
|
||||
@Advice.Thrown Throwable throwable) {
|
||||
if (scope == null) {
|
||||
return;
|
||||
}
|
||||
scope.close();
|
||||
listenerInstrumenter().end(context, request, null, throwable);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,27 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.spring.jms.v6_0;
|
||||
|
||||
import io.opentelemetry.api.GlobalOpenTelemetry;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
|
||||
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
|
||||
import io.opentelemetry.javaagent.instrumentation.jms.JmsInstrumenterFactory;
|
||||
import io.opentelemetry.javaagent.instrumentation.jms.MessageWithDestination;
|
||||
|
||||
public final class SpringJmsSingletons {
|
||||
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.spring-jms-6.0";
|
||||
|
||||
private static final Instrumenter<MessageWithDestination, Void> LISTENER_INSTRUMENTER =
|
||||
new JmsInstrumenterFactory(GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME)
|
||||
.setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders())
|
||||
.createConsumerProcessInstrumenter();
|
||||
|
||||
public static Instrumenter<MessageWithDestination, Void> listenerInstrumenter() {
|
||||
return LISTENER_INSTRUMENTER;
|
||||
}
|
||||
|
||||
private SpringJmsSingletons() {}
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.spring.jms.v6_0;
|
||||
|
||||
import jakarta.jms.ConnectionFactory;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
|
||||
import org.springframework.jms.config.JmsListenerContainerFactory;
|
||||
|
||||
abstract class AbstractConfig {
|
||||
|
||||
@Bean
|
||||
public ConnectionFactory connectionFactory(@Value("${test.broker-url}") String artemisBrokerUrl) {
|
||||
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(artemisBrokerUrl);
|
||||
connectionFactory.setUser("test");
|
||||
connectionFactory.setPassword("test");
|
||||
return connectionFactory;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public JmsListenerContainerFactory<?> jmsListenerContainerFactory(
|
||||
ConnectionFactory connectionFactory) {
|
||||
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
|
||||
factory.setConnectionFactory(connectionFactory);
|
||||
return factory;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public CompletableFuture<String> receivedMessage() {
|
||||
return new CompletableFuture<>();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,13 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.spring.jms.v6_0;
|
||||
|
||||
import org.springframework.context.annotation.ComponentScan;
|
||||
import org.springframework.jms.annotation.EnableJms;
|
||||
|
||||
@ComponentScan
|
||||
@EnableJms
|
||||
public class AnnotatedListenerConfig extends AbstractConfig {}
|
|
@ -0,0 +1,29 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.spring.jms.v6_0;
|
||||
|
||||
import static io.opentelemetry.instrumentation.testing.GlobalTraceUtil.runWithSpan;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.jms.annotation.JmsListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
public class AnnotatedTestListener {
|
||||
|
||||
private final CompletableFuture<String> receivedMessage;
|
||||
|
||||
@Autowired
|
||||
public AnnotatedTestListener(CompletableFuture<String> receivedMessage) {
|
||||
this.receivedMessage = receivedMessage;
|
||||
}
|
||||
|
||||
@JmsListener(destination = "spring-jms-listener")
|
||||
public void receiveMessage(String message) {
|
||||
runWithSpan("consumer", () -> receivedMessage.complete(message));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,46 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.spring.jms.v6_0;
|
||||
|
||||
import static io.opentelemetry.instrumentation.testing.GlobalTraceUtil.runWithSpan;
|
||||
|
||||
import jakarta.jms.TextMessage;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.jms.annotation.EnableJms;
|
||||
import org.springframework.jms.annotation.JmsListenerConfigurer;
|
||||
import org.springframework.jms.config.JmsListenerEndpoint;
|
||||
import org.springframework.jms.listener.AbstractMessageListenerContainer;
|
||||
import org.springframework.jms.listener.MessageListenerContainer;
|
||||
import org.springframework.jms.listener.SessionAwareMessageListener;
|
||||
|
||||
@EnableJms
|
||||
public class ManualListenerConfig extends AbstractConfig {
|
||||
|
||||
@Bean
|
||||
public JmsListenerConfigurer jmsListenerConfigurer(CompletableFuture<String> receivedMessage) {
|
||||
return registrar ->
|
||||
registrar.registerEndpoint(
|
||||
new JmsListenerEndpoint() {
|
||||
@Override
|
||||
public String getId() {
|
||||
return "testid";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setupListenerContainer(MessageListenerContainer listenerContainer) {
|
||||
AbstractMessageListenerContainer container =
|
||||
(AbstractMessageListenerContainer) listenerContainer;
|
||||
container.setDestinationName("spring-jms-listener");
|
||||
container.setupMessageListener(
|
||||
(SessionAwareMessageListener<TextMessage>)
|
||||
(message, session) ->
|
||||
runWithSpan(
|
||||
"consumer", () -> receivedMessage.complete(message.getText())));
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
|
@ -0,0 +1,261 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.spring.jms.v6_0;
|
||||
|
||||
import static io.opentelemetry.api.common.AttributeKey.stringArrayKey;
|
||||
import static io.opentelemetry.api.trace.SpanKind.CONSUMER;
|
||||
import static io.opentelemetry.api.trace.SpanKind.INTERNAL;
|
||||
import static io.opentelemetry.api.trace.SpanKind.PRODUCER;
|
||||
import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind;
|
||||
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
|
||||
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
|
||||
import static java.util.Collections.singletonList;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.params.provider.Arguments.arguments;
|
||||
|
||||
import io.opentelemetry.instrumentation.testing.internal.AutoCleanupExtension;
|
||||
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
|
||||
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
|
||||
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
|
||||
import jakarta.jms.ConnectionFactory;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.stream.Stream;
|
||||
import org.assertj.core.api.AbstractStringAssert;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.extension.ExtensionContext;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.ArgumentsProvider;
|
||||
import org.junit.jupiter.params.provider.ArgumentsSource;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.jms.core.JmsTemplate;
|
||||
import org.testcontainers.containers.GenericContainer;
|
||||
import org.testcontainers.containers.output.Slf4jLogConsumer;
|
||||
|
||||
class SpringJmsListenerTest {
|
||||
|
||||
static final Logger logger = LoggerFactory.getLogger(SpringJmsListenerTest.class);
|
||||
|
||||
@RegisterExtension
|
||||
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
|
||||
|
||||
@RegisterExtension static final AutoCleanupExtension cleanup = AutoCleanupExtension.create();
|
||||
|
||||
static GenericContainer<?> broker;
|
||||
|
||||
@BeforeAll
|
||||
static void setUp() {
|
||||
broker =
|
||||
new GenericContainer<>("quay.io/artemiscloud/activemq-artemis-broker:artemis.2.27.0")
|
||||
.withEnv("AMQ_USER", "test")
|
||||
.withEnv("AMQ_PASSWORD", "test")
|
||||
.withExposedPorts(61616, 8161)
|
||||
.withLogConsumer(new Slf4jLogConsumer(logger));
|
||||
broker.start();
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
static void tearDown() {
|
||||
if (broker != null) {
|
||||
broker.close();
|
||||
}
|
||||
}
|
||||
|
||||
@ArgumentsSource(ConfigClasses.class)
|
||||
@ParameterizedTest
|
||||
@SuppressWarnings("unchecked")
|
||||
void testSpringJmsListener(Class<?> configClass)
|
||||
throws ExecutionException, InterruptedException, TimeoutException {
|
||||
// given
|
||||
SpringApplication app = new SpringApplication(configClass);
|
||||
app.setDefaultProperties(defaultConfig());
|
||||
ConfigurableApplicationContext applicationContext = app.run();
|
||||
cleanup.deferCleanup(applicationContext);
|
||||
|
||||
JmsTemplate jmsTemplate = new JmsTemplate(applicationContext.getBean(ConnectionFactory.class));
|
||||
String message = "hello there";
|
||||
|
||||
// when
|
||||
testing.runWithSpan("parent", () -> jmsTemplate.convertAndSend("spring-jms-listener", message));
|
||||
|
||||
// then
|
||||
CompletableFuture<String> receivedMessage =
|
||||
applicationContext.getBean("receivedMessage", CompletableFuture.class);
|
||||
assertThat(receivedMessage.get(10, TimeUnit.SECONDS)).isEqualTo(message);
|
||||
|
||||
testing.waitAndAssertSortedTraces(
|
||||
orderByRootSpanKind(INTERNAL, CONSUMER),
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span -> span.hasName("parent").hasNoParent(),
|
||||
span ->
|
||||
span.hasName("spring-jms-listener send")
|
||||
.hasKind(PRODUCER)
|
||||
.hasParent(trace.getSpan(0))
|
||||
.hasAttributesSatisfying(
|
||||
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"),
|
||||
equalTo(
|
||||
SemanticAttributes.MESSAGING_DESTINATION, "spring-jms-listener"),
|
||||
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "queue"),
|
||||
satisfies(
|
||||
SemanticAttributes.MESSAGING_MESSAGE_ID,
|
||||
AbstractStringAssert::isNotBlank)),
|
||||
span ->
|
||||
span.hasName("spring-jms-listener process")
|
||||
.hasKind(CONSUMER)
|
||||
.hasParent(trace.getSpan(1))
|
||||
.hasAttributesSatisfying(
|
||||
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"),
|
||||
equalTo(
|
||||
SemanticAttributes.MESSAGING_DESTINATION, "spring-jms-listener"),
|
||||
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "queue"),
|
||||
equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
|
||||
satisfies(
|
||||
SemanticAttributes.MESSAGING_MESSAGE_ID,
|
||||
AbstractStringAssert::isNotBlank)),
|
||||
span -> span.hasName("consumer").hasParent(trace.getSpan(2))),
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span ->
|
||||
span.hasName("spring-jms-listener receive")
|
||||
.hasKind(CONSUMER)
|
||||
.hasNoParent()
|
||||
.hasAttributesSatisfying(
|
||||
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"),
|
||||
equalTo(
|
||||
SemanticAttributes.MESSAGING_DESTINATION, "spring-jms-listener"),
|
||||
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "queue"),
|
||||
equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"),
|
||||
satisfies(
|
||||
SemanticAttributes.MESSAGING_MESSAGE_ID,
|
||||
AbstractStringAssert::isNotBlank))));
|
||||
}
|
||||
|
||||
@ArgumentsSource(ConfigClasses.class)
|
||||
@ParameterizedTest
|
||||
@SuppressWarnings("unchecked")
|
||||
void shouldCaptureHeaders(Class<?> configClass)
|
||||
throws ExecutionException, InterruptedException, TimeoutException {
|
||||
// given
|
||||
SpringApplication app = new SpringApplication(configClass);
|
||||
app.setDefaultProperties(defaultConfig());
|
||||
ConfigurableApplicationContext applicationContext = app.run();
|
||||
cleanup.deferCleanup(applicationContext);
|
||||
|
||||
JmsTemplate jmsTemplate = new JmsTemplate(applicationContext.getBean(ConnectionFactory.class));
|
||||
String message = "hello there";
|
||||
|
||||
// when
|
||||
testing.runWithSpan(
|
||||
"parent",
|
||||
() ->
|
||||
jmsTemplate.convertAndSend(
|
||||
"spring-jms-listener",
|
||||
message,
|
||||
jmsMessage -> {
|
||||
jmsMessage.setStringProperty("test_message_header", "test");
|
||||
jmsMessage.setIntProperty("test_message_int_header", 1234);
|
||||
return jmsMessage;
|
||||
}));
|
||||
|
||||
// then
|
||||
CompletableFuture<String> receivedMessage =
|
||||
applicationContext.getBean("receivedMessage", CompletableFuture.class);
|
||||
assertThat(receivedMessage.get(10, TimeUnit.SECONDS)).isEqualTo(message);
|
||||
|
||||
testing.waitAndAssertSortedTraces(
|
||||
orderByRootSpanKind(INTERNAL, CONSUMER),
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span -> span.hasName("parent").hasNoParent(),
|
||||
span ->
|
||||
span.hasName("spring-jms-listener send")
|
||||
.hasKind(PRODUCER)
|
||||
.hasParent(trace.getSpan(0))
|
||||
.hasAttributesSatisfying(
|
||||
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"),
|
||||
equalTo(
|
||||
SemanticAttributes.MESSAGING_DESTINATION, "spring-jms-listener"),
|
||||
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "queue"),
|
||||
satisfies(
|
||||
SemanticAttributes.MESSAGING_MESSAGE_ID,
|
||||
AbstractStringAssert::isNotBlank),
|
||||
equalTo(
|
||||
stringArrayKey("messaging.header.test_message_header"),
|
||||
singletonList("test")),
|
||||
equalTo(
|
||||
stringArrayKey("messaging.header.test_message_int_header"),
|
||||
singletonList("1234"))),
|
||||
span ->
|
||||
span.hasName("spring-jms-listener process")
|
||||
.hasKind(CONSUMER)
|
||||
.hasParent(trace.getSpan(1))
|
||||
.hasAttributesSatisfying(
|
||||
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"),
|
||||
equalTo(
|
||||
SemanticAttributes.MESSAGING_DESTINATION, "spring-jms-listener"),
|
||||
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "queue"),
|
||||
equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
|
||||
satisfies(
|
||||
SemanticAttributes.MESSAGING_MESSAGE_ID,
|
||||
AbstractStringAssert::isNotBlank),
|
||||
equalTo(
|
||||
stringArrayKey("messaging.header.test_message_header"),
|
||||
singletonList("test")),
|
||||
equalTo(
|
||||
stringArrayKey("messaging.header.test_message_int_header"),
|
||||
singletonList("1234"))),
|
||||
span -> span.hasName("consumer").hasParent(trace.getSpan(2))),
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span ->
|
||||
span.hasName("spring-jms-listener receive")
|
||||
.hasKind(CONSUMER)
|
||||
.hasNoParent()
|
||||
.hasAttributesSatisfying(
|
||||
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"),
|
||||
equalTo(
|
||||
SemanticAttributes.MESSAGING_DESTINATION, "spring-jms-listener"),
|
||||
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "queue"),
|
||||
equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"),
|
||||
satisfies(
|
||||
SemanticAttributes.MESSAGING_MESSAGE_ID,
|
||||
AbstractStringAssert::isNotBlank),
|
||||
equalTo(
|
||||
stringArrayKey("messaging.header.test_message_header"),
|
||||
singletonList("test")),
|
||||
equalTo(
|
||||
stringArrayKey("messaging.header.test_message_int_header"),
|
||||
singletonList("1234")))));
|
||||
}
|
||||
|
||||
private static Map<String, Object> defaultConfig() {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
props.put("spring.jmx.enabled", false);
|
||||
props.put("spring.main.web-application-type", "none");
|
||||
props.put("test.broker-url", "tcp://localhost:" + broker.getMappedPort(61616));
|
||||
return props;
|
||||
}
|
||||
|
||||
static final class ConfigClasses implements ArgumentsProvider {
|
||||
|
||||
@Override
|
||||
public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
|
||||
return Stream.of(
|
||||
arguments(AnnotatedListenerConfig.class), arguments(ManualListenerConfig.class));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -442,7 +442,8 @@ hideFromDependabot(":instrumentation:spring:spring-data:spring-data-common:testi
|
|||
hideFromDependabot(":instrumentation:spring:spring-integration-4.1:javaagent")
|
||||
hideFromDependabot(":instrumentation:spring:spring-integration-4.1:library")
|
||||
hideFromDependabot(":instrumentation:spring:spring-integration-4.1:testing")
|
||||
hideFromDependabot(":instrumentation:spring:spring-jms-2.0:javaagent")
|
||||
hideFromDependabot(":instrumentation:spring:spring-jms:spring-jms-2.0:javaagent")
|
||||
hideFromDependabot(":instrumentation:spring:spring-jms:spring-jms-6.0:javaagent")
|
||||
hideFromDependabot(":instrumentation:spring:spring-kafka-2.7:javaagent")
|
||||
hideFromDependabot(":instrumentation:spring:spring-kafka-2.7:library")
|
||||
hideFromDependabot(":instrumentation:spring:spring-kafka-2.7:testing")
|
||||
|
|
Loading…
Reference in New Issue