convert spring integration tests to java (#11999)

Co-authored-by: Jay DeLuca <jaydeluca4@gmail.com>
This commit is contained in:
Gregor Zeitlinger 2024-08-27 00:30:45 +02:00 committed by GitHub
parent 53f019b8f6
commit 6700efdd23
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
34 changed files with 1044 additions and 940 deletions

View File

@ -1,13 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.test.AgentTestTrait
class ComplexPropagationTest extends AbstractComplexPropagationTest implements AgentTestTrait {
@Override
Class<?> additionalContextClass() {
null
}
}

View File

@ -1,13 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.test.AgentTestTrait
class SpringCloudStreamProducerTest extends AbstractSpringCloudStreamProducerTest implements AgentTestTrait {
@Override
Class<?> additionalContextClass() {
null
}
}

View File

@ -1,13 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.test.AgentTestTrait
class SpringCloudStreamRabbitTest extends AbstractSpringCloudStreamRabbitTest implements AgentTestTrait {
@Override
Class<?> additionalContextClass() {
null
}
}

View File

@ -1,123 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes
import io.opentelemetry.semconv.NetworkAttributes
import static io.opentelemetry.api.trace.SpanKind.CLIENT
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
class SpringIntegrationAndRabbitTest extends AgentInstrumentationSpecification implements WithRabbitProducerConsumerTrait {
def setupSpec() {
startRabbit()
}
def cleanupSpec() {
stopRabbit()
}
def "should cooperate with existing RabbitMQ instrumentation"() {
when:
runWithSpan("parent") {
producerContext.getBean("producer", Runnable).run()
}
then:
assertTraces(2) {
trace(0, 7) {
span(0) {
name "parent"
attributes {}
}
span(1) {
name "producer"
childOf span(0)
attributes {}
}
span(2) {
// span created by rabbitmq instrumentation
name "exchange.declare"
childOf span(1)
kind CLIENT
attributes {
"$NetworkAttributes.NETWORK_PEER_ADDRESS" { it == "127.0.0.1" || it == "0:0:0:0:0:0:0:1" || it == null }
"$NetworkAttributes.NETWORK_PEER_PORT" Long
"$NetworkAttributes.NETWORK_TYPE" { it == "ipv4" || it == "ipv6" || it == null }
"$MessagingIncubatingAttributes.MESSAGING_SYSTEM" "rabbitmq"
}
}
span(3) {
// span created by rabbitmq instrumentation
name "testTopic publish"
childOf span(1)
kind PRODUCER
attributes {
"$NetworkAttributes.NETWORK_PEER_ADDRESS" { it == "127.0.0.1" || it == "0:0:0:0:0:0:0:1" || it == null }
"$NetworkAttributes.NETWORK_PEER_PORT" Long
"$NetworkAttributes.NETWORK_TYPE" { it == "ipv4" || it == "ipv6" || it == null }
"$MessagingIncubatingAttributes.MESSAGING_SYSTEM" "rabbitmq"
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" "testTopic"
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "publish"
"$MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE" Long
"$MessagingIncubatingAttributes.MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY" String
}
}
// spring-cloud-stream-binder-rabbit listener puts all messages into a BlockingQueue immediately after receiving
// that's why the rabbitmq CONSUMER span will never have any child span (and propagate context, actually)
span(4) {
// span created by rabbitmq instrumentation
name ~/testTopic.anonymous.[-\w]+ process/
childOf span(3)
kind CONSUMER
attributes {
"$NetworkAttributes.NETWORK_PEER_ADDRESS" { it == "127.0.0.1" || it == "0:0:0:0:0:0:0:1" || it == null }
"$NetworkAttributes.NETWORK_PEER_PORT" Long
"$NetworkAttributes.NETWORK_TYPE" { it == "ipv4" || it == "ipv6" || it == null }
"$MessagingIncubatingAttributes.MESSAGING_SYSTEM" "rabbitmq"
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" "testTopic"
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "process"
"$MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE" Long
"$MessagingIncubatingAttributes.MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY" String
}
}
// spring-integration will detect that spring-rabbit has already created a consumer span and back off
span(5) {
// span created by spring-rabbit instrumentation
name "testTopic process"
childOf span(3)
kind CONSUMER
attributes {
"$MessagingIncubatingAttributes.MESSAGING_SYSTEM" "rabbitmq"
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" "testTopic"
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "process"
"$MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID" String
"$MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE" Long
}
}
span(6) {
name "consumer"
childOf span(5)
attributes {}
}
}
trace(1, 1) {
span(0) {
// span created by rabbitmq instrumentation
name "basic.ack"
kind CLIENT
attributes {
"$NetworkAttributes.NETWORK_PEER_ADDRESS" { it == "127.0.0.1" || it == "0:0:0:0:0:0:0:1" || it == null }
"$NetworkAttributes.NETWORK_PEER_PORT" Long
"$NetworkAttributes.NETWORK_TYPE" { it == "ipv4" || it == "ipv6" || it == null }
"$MessagingIncubatingAttributes.MESSAGING_SYSTEM" "rabbitmq"
}
}
}
}
}
}

View File

@ -1,13 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.test.AgentTestTrait
class SpringIntegrationTelemetryTest extends AbstractSpringIntegrationTracingTest implements AgentTestTrait {
@Override
Class<?> additionalContextClass() {
null
}
}

View File

@ -0,0 +1,20 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.integration.v4_1;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import org.junit.jupiter.api.extension.RegisterExtension;
class ComplexPropagationTest extends AbstractComplexPropagationTest {
@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
ComplexPropagationTest() {
super(testing, null);
}
}

View File

@ -0,0 +1,20 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.integration.v4_1;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import org.junit.jupiter.api.extension.RegisterExtension;
class SpringCloudStreamProducerTest extends AbstractSpringCloudStreamProducerTest {
@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
SpringCloudStreamProducerTest() {
super(testing, null);
}
}

View File

@ -0,0 +1,20 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.integration.v4_1;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import org.junit.jupiter.api.extension.RegisterExtension;
class SpringCloudStreamRabbitTest extends AbstractSpringCloudStreamRabbitTest {
@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
SpringCloudStreamRabbitTest() {
super(testing, null);
}
}

View File

@ -0,0 +1,155 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.integration.v4_1;
import static io.opentelemetry.instrumentation.testing.GlobalTraceUtil.runWithSpan;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.semconv.NetworkAttributes;
import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
class SpringIntegrationAndRabbitTest {
@RegisterExtension RabbitExtension rabbit;
@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
SpringIntegrationAndRabbitTest() {
rabbit = new RabbitExtension(null);
}
@Test
void shouldCooperateWithExistingRabbitMqInstrumentation() {
testing.waitForTraces(13); // from rabbitmq instrumentation of startup
testing.clearData();
runWithSpan("parent", () -> rabbit.getBean("producer", Runnable.class).run());
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasTotalAttributeCount(0),
span ->
span.hasName("producer").hasParent(trace.getSpan(0)).hasTotalAttributeCount(0),
span -> span.hasName("exchange.declare"),
span ->
span.hasName("exchange.declare")
.hasParent(trace.getSpan(1))
.hasKind(SpanKind.CLIENT)
.hasAttributesSatisfyingExactly(
satisfies(
NetworkAttributes.NETWORK_PEER_ADDRESS,
s -> s.isIn("127.0.0.1", "0:0:0:0:0:0:0:1", null)),
satisfies(
NetworkAttributes.NETWORK_PEER_PORT,
l -> l.isInstanceOf(Long.class)),
satisfies(
NetworkAttributes.NETWORK_TYPE, s -> s.isIn("ipv4", "ipv6", null)),
equalTo(MessagingIncubatingAttributes.MESSAGING_SYSTEM, "rabbitmq")),
span -> span.hasName("queue.declare"),
span -> span.hasName("queue.bind"),
span ->
span.hasName("testTopic publish")
.hasParent(trace.getSpan(1))
.hasKind(SpanKind.PRODUCER)
.hasAttributesSatisfyingExactly(
satisfies(
NetworkAttributes.NETWORK_PEER_ADDRESS,
s -> s.isIn("127.0.0.1", "0:0:0:0:0:0:0:1", null)),
satisfies(
NetworkAttributes.NETWORK_PEER_PORT,
l -> l.isInstanceOf(Long.class)),
satisfies(
NetworkAttributes.NETWORK_TYPE, s -> s.isIn("ipv4", "ipv6", null)),
equalTo(MessagingIncubatingAttributes.MESSAGING_SYSTEM, "rabbitmq"),
equalTo(
MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME,
"testTopic"),
equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "publish"),
satisfies(
MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE,
l -> l.isInstanceOf(Long.class)),
satisfies(
MessagingIncubatingAttributes
.MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY,
s -> s.isInstanceOf(String.class))),
// spring-cloud-stream-binder-rabbit listener puts all messages into a BlockingQueue
// immediately after receiving
// that's why the rabbitmq CONSUMER span will never have any child span (and
// propagate context, actually)
span ->
span.satisfies(
spanData ->
assertThat(spanData.getName())
.matches("testTopic.anonymous.[-\\w]+ process"))
.hasParent(trace.getSpan(6))
.hasKind(SpanKind.CONSUMER)
.hasAttributesSatisfyingExactly(
satisfies(
NetworkAttributes.NETWORK_PEER_ADDRESS,
s -> s.isIn("127.0.0.1", "0:0:0:0:0:0:0:1", null)),
satisfies(
NetworkAttributes.NETWORK_PEER_PORT,
l -> l.isInstanceOf(Long.class)),
satisfies(
NetworkAttributes.NETWORK_TYPE, s -> s.isIn("ipv4", "ipv6", null)),
equalTo(MessagingIncubatingAttributes.MESSAGING_SYSTEM, "rabbitmq"),
equalTo(
MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME,
"testTopic"),
equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "process"),
satisfies(
MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE,
l -> l.isInstanceOf(Long.class)),
satisfies(
MessagingIncubatingAttributes
.MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY,
s -> s.isInstanceOf(String.class))),
// spring-integration will detect that spring-rabbit has already created a consumer
// span and back off
span ->
span.hasName("testTopic process")
.hasParent(trace.getSpan(6))
.hasKind(SpanKind.CONSUMER)
.hasAttributesSatisfyingExactly(
equalTo(MessagingIncubatingAttributes.MESSAGING_SYSTEM, "rabbitmq"),
equalTo(
MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME,
"testTopic"),
equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "process"),
satisfies(
MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID,
s -> s.isInstanceOf(String.class)),
satisfies(
MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE,
l -> l.isInstanceOf(Long.class))),
span ->
span.hasName("consumer").hasParent(trace.getSpan(8)).hasTotalAttributeCount(0)),
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("basic.ack")
.hasKind(SpanKind.CLIENT)
.hasAttributesSatisfyingExactly(
satisfies(
NetworkAttributes.NETWORK_PEER_ADDRESS,
s -> s.isIn("127.0.0.1", "0:0:0:0:0:0:0:1", null)),
satisfies(
NetworkAttributes.NETWORK_PEER_PORT,
l -> l.isInstanceOf(Long.class)),
satisfies(
NetworkAttributes.NETWORK_TYPE, s -> s.isIn("ipv4", "ipv6", null)),
equalTo(MessagingIncubatingAttributes.MESSAGING_SYSTEM, "rabbitmq"))));
}
}

View File

@ -0,0 +1,20 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.integration.v4_1;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import org.junit.jupiter.api.extension.RegisterExtension;
class SpringIntegrationTelemetryTest extends AbstractSpringIntegrationTracingTest {
@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
SpringIntegrationTelemetryTest() {
super(testing, null);
}
}

View File

@ -1,13 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.test.LibraryTestTrait
class ComplexPropagationTest extends AbstractComplexPropagationTest implements LibraryTestTrait {
@Override
Class<?> additionalContextClass() {
GlobalInterceptorSpringConfig
}
}

View File

@ -1,26 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.instrumentation.spring.integration.v4_1.SpringIntegrationTelemetry
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.integration.config.GlobalChannelInterceptor
import org.springframework.messaging.support.ChannelInterceptor
import static java.util.Collections.singletonList
@Configuration
class GlobalInterceptorSpringConfig {
@GlobalChannelInterceptor
@Bean
ChannelInterceptor otelInterceptor() {
SpringIntegrationTelemetry.builder(GlobalOpenTelemetry.get())
.setCapturedHeaders(singletonList("test-message-header"))
.build()
.newChannelInterceptor()
}
}

View File

@ -1,24 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.instrumentation.spring.integration.v4_1.SpringIntegrationTelemetry
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.integration.config.GlobalChannelInterceptor
import org.springframework.messaging.support.ChannelInterceptor
@Configuration
class GlobalInterceptorWithProducerSpanSpringConfig {
@GlobalChannelInterceptor
@Bean
ChannelInterceptor otelInterceptor() {
SpringIntegrationTelemetry.builder(GlobalOpenTelemetry.get())
.setProducerSpanEnabled(true)
.build()
.newChannelInterceptor()
}
}

View File

@ -1,13 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.test.LibraryTestTrait
class SpringCloudStreamProducerTest extends AbstractSpringCloudStreamProducerTest implements LibraryTestTrait {
@Override
Class<?> additionalContextClass() {
GlobalInterceptorWithProducerSpanSpringConfig
}
}

View File

@ -1,13 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.test.LibraryTestTrait
class SpringCloudStreamRabbitTest extends AbstractSpringCloudStreamRabbitTest implements LibraryTestTrait {
@Override
Class<?> additionalContextClass() {
GlobalInterceptorSpringConfig
}
}

View File

@ -1,13 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.test.LibraryTestTrait
class SpringIntegrationTelemetryTest extends AbstractSpringIntegrationTracingTest implements LibraryTestTrait {
@Override
Class<?> additionalContextClass() {
GlobalInterceptorSpringConfig
}
}

View File

@ -0,0 +1,20 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.integration.v4_1;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
import org.junit.jupiter.api.extension.RegisterExtension;
class ComplexPropagationTest extends AbstractComplexPropagationTest {
@RegisterExtension
static final InstrumentationExtension testing = LibraryInstrumentationExtension.create();
public ComplexPropagationTest() {
super(testing, GlobalInterceptorSpringConfig.class);
}
}

View File

@ -0,0 +1,28 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.integration.v4_1;
import static java.util.Collections.singletonList;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.spring.integration.v4_1.SpringIntegrationTelemetry;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.config.GlobalChannelInterceptor;
import org.springframework.messaging.support.ChannelInterceptor;
@Configuration
class GlobalInterceptorSpringConfig {
@GlobalChannelInterceptor
@Bean
ChannelInterceptor otelInterceptor() {
return SpringIntegrationTelemetry.builder(GlobalOpenTelemetry.get())
.setCapturedHeaders(singletonList("test-message-header"))
.build()
.newChannelInterceptor();
}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.integration.v4_1;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.spring.integration.v4_1.SpringIntegrationTelemetry;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.config.GlobalChannelInterceptor;
import org.springframework.messaging.support.ChannelInterceptor;
@Configuration
class GlobalInterceptorWithProducerSpanSpringConfig {
@GlobalChannelInterceptor
@Bean
ChannelInterceptor otelInterceptor() {
return SpringIntegrationTelemetry.builder(GlobalOpenTelemetry.get())
.setProducerSpanEnabled(true)
.build()
.newChannelInterceptor();
}
}

View File

@ -0,0 +1,19 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.integration.v4_1;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
import org.junit.jupiter.api.extension.RegisterExtension;
class SpringCloudStreamProducerTest extends AbstractSpringCloudStreamProducerTest {
@RegisterExtension
static final InstrumentationExtension testing = LibraryInstrumentationExtension.create();
public SpringCloudStreamProducerTest() {
super(testing, GlobalInterceptorWithProducerSpanSpringConfig.class);
}
}

View File

@ -0,0 +1,19 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.integration.v4_1;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
import org.junit.jupiter.api.extension.RegisterExtension;
class SpringCloudStreamRabbitTest extends AbstractSpringCloudStreamRabbitTest {
@RegisterExtension
static final InstrumentationExtension testing = LibraryInstrumentationExtension.create();
public SpringCloudStreamRabbitTest() {
super(testing, GlobalInterceptorSpringConfig.class);
}
}

View File

@ -0,0 +1,19 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.integration.v4_1;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
import org.junit.jupiter.api.extension.RegisterExtension;
class SpringIntegrationTelemetryTest extends AbstractSpringIntegrationTracingTest {
@RegisterExtension
static final InstrumentationExtension testing = LibraryInstrumentationExtension.create();
public SpringIntegrationTelemetryTest() {
super(testing, GlobalInterceptorSpringConfig.class);
}
}

View File

@ -1,151 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import org.springframework.boot.SpringApplication
import org.springframework.boot.SpringBootConfiguration
import org.springframework.boot.autoconfigure.EnableAutoConfiguration
import org.springframework.boot.context.event.ApplicationReadyEvent
import org.springframework.context.ConfigurableApplicationContext
import org.springframework.context.annotation.Bean
import org.springframework.context.event.EventListener
import org.springframework.integration.channel.DirectChannel
import org.springframework.integration.channel.ExecutorChannel
import org.springframework.messaging.Message
import org.springframework.messaging.SubscribableChannel
import org.springframework.messaging.support.MessageBuilder
import spock.lang.Shared
import java.util.concurrent.BlockingQueue
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.LinkedBlockingQueue
import java.util.stream.Collectors
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
abstract class AbstractComplexPropagationTest extends InstrumentationSpecification {
abstract Class<?> additionalContextClass()
@Shared
ConfigurableApplicationContext applicationContext
def setupSpec() {
def contextClasses = [ExternalQueueConfig]
if (additionalContextClass() != null) {
contextClasses += additionalContextClass()
}
def app = new SpringApplication(contextClasses as Class<?>[])
app.setDefaultProperties([
"spring.main.web-application-type": "none"
])
applicationContext = app.run()
}
def cleanupSpec() {
applicationContext?.close()
}
def "should propagate context through a custom message queue"() {
given:
def sendChannel = applicationContext.getBean("sendChannel", SubscribableChannel)
def receiveChannel = applicationContext.getBean("receiveChannel", SubscribableChannel)
def messageHandler = new CapturingMessageHandler()
receiveChannel.subscribe(messageHandler)
when:
sendChannel.send(MessageBuilder.withPayload("test")
.setHeader("theAnswer", "42")
.build())
then:
messageHandler.join()
assertTraces(1) {
trace(0, 3) {
// there's no span in the context, so spring-integration adds a CONSUMER one
span(0) {
name "application.sendChannel process"
kind CONSUMER
}
// message is received in a separate thread without any context, so a CONSUMER span with parent
// extracted from the incoming message is created
span(1) {
name "application.receiveChannel process"
childOf span(0)
kind CONSUMER
}
span(2) {
name "handler"
childOf span(1)
}
}
}
cleanup:
receiveChannel.unsubscribe(messageHandler)
}
// this setup simulates separate producer/consumer and some "external" message queue in between
@SpringBootConfiguration
@EnableAutoConfiguration
static class ExternalQueueConfig {
@Bean
SubscribableChannel sendChannel() {
new ExecutorChannel(Executors.newSingleThreadExecutor())
}
@Bean
SubscribableChannel receiveChannel() {
new DirectChannel()
}
@Bean
BlockingQueue<Payload> externalQueue() {
new LinkedBlockingQueue<Payload>()
}
@Bean(destroyMethod = "shutdownNow")
ExecutorService consumerThread() {
Executors.newSingleThreadExecutor()
}
@EventListener(ApplicationReadyEvent)
void initialize() {
sendChannel().subscribe { message ->
externalQueue().offer(Payload.from(message))
}
consumerThread().execute({
while (!Thread.interrupted()) {
def payload = externalQueue().take()
receiveChannel().send(payload.toMessage())
}
})
}
}
static class Payload {
String body
Map<String, String> headers
static Payload from(Message<?> message) {
def body = message.payload as String
Map<String, String> headers = message.headers.entrySet().stream()
.filter({ kv -> kv.value instanceof String })
.collect(Collectors.toMap({ it.key }, { it.value }))
new Payload(body: body, headers: headers)
}
Message<String> toMessage() {
MessageBuilder.withPayload(body)
.copyHeaders(headers)
.build()
}
}
}

View File

@ -1,54 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
import static org.junit.jupiter.api.Assumptions.assumeTrue
abstract class AbstractSpringCloudStreamProducerTest extends InstrumentationSpecification implements WithRabbitProducerConsumerTrait {
private static final boolean HAS_PRODUCER_SPAN = Boolean.getBoolean("otel.instrumentation.spring-integration.producer.enabled")
abstract Class<?> additionalContextClass()
def setupSpec() {
startRabbit(additionalContextClass())
}
def cleanupSpec() {
stopRabbit()
}
def "has producer span"() {
assumeTrue(HAS_PRODUCER_SPAN)
when:
producerContext.getBean("producer", Runnable).run()
then:
assertTraces(1) {
trace(0, 4) {
span(0) {
name "producer"
}
span(1) {
name "testProducer.output publish"
childOf span(0)
kind PRODUCER
}
span(2) {
name "testConsumer.input process"
childOf span(1)
kind CONSUMER
}
span(3) {
name "consumer"
childOf span(2)
}
}
}
}
}

View File

@ -1,44 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
abstract class AbstractSpringCloudStreamRabbitTest extends InstrumentationSpecification implements WithRabbitProducerConsumerTrait {
abstract Class<?> additionalContextClass()
def setupSpec() {
startRabbit(additionalContextClass())
}
def cleanupSpec() {
stopRabbit()
}
def "should propagate context through RabbitMQ"() {
when:
producerContext.getBean("producer", Runnable).run()
then:
assertTraces(1) {
trace(0, 3) {
span(0) {
name "producer"
}
span(1) {
name "testConsumer.input process"
childOf span(0)
kind CONSUMER
}
span(2) {
name "consumer"
childOf span(1)
}
}
}
}
}

View File

@ -1,284 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import io.opentelemetry.sdk.trace.data.SpanData
import org.springframework.boot.SpringApplication
import org.springframework.boot.SpringBootConfiguration
import org.springframework.boot.autoconfigure.EnableAutoConfiguration
import org.springframework.boot.context.event.ApplicationReadyEvent
import org.springframework.context.ConfigurableApplicationContext
import org.springframework.context.annotation.Bean
import org.springframework.context.event.EventListener
import org.springframework.integration.channel.DirectChannel
import org.springframework.integration.channel.interceptor.GlobalChannelInterceptorWrapper
import org.springframework.messaging.Message
import org.springframework.messaging.SubscribableChannel
import org.springframework.messaging.support.ExecutorSubscribableChannel
import org.springframework.messaging.support.MessageBuilder
import spock.lang.Shared
import spock.lang.Unroll
import java.util.concurrent.Executors
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
@Unroll
abstract class AbstractSpringIntegrationTracingTest extends InstrumentationSpecification {
abstract Class<?> additionalContextClass()
@Shared
ConfigurableApplicationContext applicationContext
def setupSpec() {
def contextClasses = [MessageChannelsConfig]
if (additionalContextClass() != null) {
contextClasses += additionalContextClass()
}
def app = new SpringApplication(contextClasses as Class<?>[])
app.setDefaultProperties([
"spring.main.web-application-type": "none"
])
applicationContext = app.run()
}
def cleanupSpec() {
applicationContext?.close()
}
def "should propagate context (#channelName)"() {
given:
def channel = applicationContext.getBean(channelName, SubscribableChannel)
def messageHandler = new CapturingMessageHandler()
channel.subscribe(messageHandler)
when:
channel.send(MessageBuilder.withPayload("test")
.build())
then:
def capturedMessage = messageHandler.join()
assertTraces(1) {
trace(0, 2) {
span(0) {
name interceptorSpanName
kind CONSUMER
}
span(1) {
name "handler"
childOf span(0)
}
def interceptorSpan = span(0)
verifyCorrectSpanWasPropagated(capturedMessage, interceptorSpan)
}
}
cleanup:
channel.unsubscribe(messageHandler)
where:
channelName | interceptorSpanName
"directChannel" | "application.directChannel process"
"executorChannel" | "executorChannel process"
}
def "should not add interceptor twice"() {
given:
def channel = applicationContext.getBean("directChannel1", SubscribableChannel)
def messageHandler = new CapturingMessageHandler()
channel.subscribe(messageHandler)
when:
channel.send(MessageBuilder.withPayload("test")
.build())
then:
def capturedMessage = messageHandler.join()
assertTraces(1) {
trace(0, 2) {
span(0) {
// the channel name is overwritten by the last bean registration
name "application.directChannel2 process"
kind CONSUMER
}
span(1) {
name "handler"
childOf span(0)
}
def interceptorSpan = span(0)
verifyCorrectSpanWasPropagated(capturedMessage, interceptorSpan)
}
}
cleanup:
channel.unsubscribe(messageHandler)
}
def "should not create a span when there is already a span in the context"() {
given:
def channel = applicationContext.getBean("directChannel", SubscribableChannel)
def messageHandler = new CapturingMessageHandler()
channel.subscribe(messageHandler)
when:
runWithSpan("parent") {
channel.send(MessageBuilder.withPayload("test")
.build())
}
then:
messageHandler.join()
assertTraces(1) {
trace(0, 2) {
span(0) {
name "parent"
}
span(1) {
name "handler"
childOf span(0)
}
}
}
cleanup:
channel.unsubscribe(messageHandler)
}
def "should handle multiple message channels in a chain"() {
given:
def channel1 = applicationContext.getBean("linkedChannel1", SubscribableChannel)
def channel2 = applicationContext.getBean("linkedChannel2", SubscribableChannel)
def messageHandler = new CapturingMessageHandler()
channel2.subscribe(messageHandler)
when:
channel1.send(MessageBuilder.withPayload("test")
.build())
then:
def capturedMessage = messageHandler.join()
assertTraces(1) {
trace(0, 2) {
span(0) {
name "application.linkedChannel1 process"
kind CONSUMER
}
span(1) {
name "handler"
childOf span(0)
}
def lastChannelSpan = span(0)
verifyCorrectSpanWasPropagated(capturedMessage, lastChannelSpan)
}
}
cleanup:
channel2.unsubscribe(messageHandler)
}
def "capture message header"() {
given:
def channel = applicationContext.getBean("directChannel", SubscribableChannel)
def messageHandler = new CapturingMessageHandler()
channel.subscribe(messageHandler)
when:
channel.send(MessageBuilder.withPayload("test")
.setHeader("test-message-header", "test")
.build())
then:
def capturedMessage = messageHandler.join()
assertTraces(1) {
trace(0, 2) {
span(0) {
name "application.directChannel process"
kind CONSUMER
}
span(1) {
name "handler"
childOf span(0)
}
def interceptorSpan = span(0)
verifyCorrectSpanWasPropagated(capturedMessage, interceptorSpan)
}
}
cleanup:
channel.unsubscribe(messageHandler)
}
static void verifyCorrectSpanWasPropagated(Message<?> capturedMessage, SpanData parentSpan) {
def propagatedSpan = capturedMessage.headers.get("traceparent") as String
assert propagatedSpan.contains(parentSpan.traceId), "wrong trace id"
assert propagatedSpan.contains(parentSpan.spanId), "wrong span id"
}
@SpringBootConfiguration
@EnableAutoConfiguration
static class MessageChannelsConfig {
SubscribableChannel problematicSharedChannel = new DirectChannel()
@Bean
SubscribableChannel directChannel() {
new DirectChannel()
}
@Bean
SubscribableChannel directChannel1() {
problematicSharedChannel
}
@Bean
SubscribableChannel directChannel2() {
problematicSharedChannel
}
@Bean
SubscribableChannel executorChannel(GlobalChannelInterceptorWrapper otelInterceptor) {
def channel = new ExecutorSubscribableChannel(Executors.newSingleThreadExecutor())
if (!Boolean.getBoolean("testLatestDeps")) {
// spring does not inject the interceptor in 4.1 because ExecutorSubscribableChannel isn't ChannelInterceptorAware
// in later versions spring injects the global interceptor into InterceptableChannel (which ExecutorSubscribableChannel is)
channel.addInterceptor(otelInterceptor.channelInterceptor)
}
channel
}
@Bean
SubscribableChannel linkedChannel1() {
new DirectChannel()
}
@Bean
SubscribableChannel linkedChannel2() {
new DirectChannel()
}
@EventListener(ApplicationReadyEvent)
void initialize() {
linkedChannel1().subscribe { message ->
linkedChannel2().send(message)
}
}
}
}

View File

@ -1,27 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import org.springframework.messaging.Message
import org.springframework.messaging.MessageHandler
import org.springframework.messaging.MessagingException
import java.util.concurrent.CompletableFuture
import static io.opentelemetry.instrumentation.testing.GlobalTraceUtil.runWithSpan
class CapturingMessageHandler implements MessageHandler {
final CompletableFuture<Message<?>> captured = new CompletableFuture<>()
@Override
void handleMessage(Message<?> message) throws MessagingException {
runWithSpan("handler") {
captured.complete(message)
}
}
Message<?> join() {
captured.join()
}
}

View File

@ -1,103 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.SpringApplication
import org.springframework.boot.SpringBootConfiguration
import org.springframework.boot.autoconfigure.EnableAutoConfiguration
import org.springframework.cloud.stream.annotation.EnableBinding
import org.springframework.cloud.stream.annotation.StreamListener
import org.springframework.cloud.stream.messaging.Sink
import org.springframework.cloud.stream.messaging.Source
import org.springframework.context.ConfigurableApplicationContext
import org.springframework.context.annotation.Bean
import org.springframework.messaging.support.MessageBuilder
import org.testcontainers.containers.GenericContainer
import org.testcontainers.containers.wait.strategy.Wait
import java.time.Duration
import static io.opentelemetry.instrumentation.testing.GlobalTraceUtil.runWithSpan
trait WithRabbitProducerConsumerTrait {
static GenericContainer rabbitMqContainer
static ConfigurableApplicationContext producerContext
static ConfigurableApplicationContext consumerContext
def startRabbit(Class<?> additionalContext = null) {
rabbitMqContainer = new GenericContainer('rabbitmq:latest')
.withExposedPorts(5672)
.waitingFor(Wait.forLogMessage(".*Server startup complete.*", 1))
.withStartupTimeout(Duration.ofMinutes(2))
rabbitMqContainer.start()
def producerApp = new SpringApplication(getContextClasses(ProducerConfig, additionalContext))
producerApp.setDefaultProperties([
"spring.application.name" : "testProducer",
"spring.jmx.enabled" : false,
"spring.main.web-application-type" : "none",
"spring.rabbitmq.host" : rabbitMqContainer.host,
"spring.rabbitmq.port" : rabbitMqContainer.getMappedPort(5672),
"spring.cloud.stream.bindings.output.destination": "testTopic"
])
producerContext = producerApp.run()
def consumerApp = new SpringApplication(getContextClasses(ConsumerConfig, additionalContext))
consumerApp.setDefaultProperties([
"spring.application.name" : "testConsumer",
"spring.jmx.enabled" : false,
"spring.main.web-application-type" : "none",
"spring.rabbitmq.host" : rabbitMqContainer.host,
"spring.rabbitmq.port" : rabbitMqContainer.getMappedPort(5672),
"spring.cloud.stream.bindings.input.destination": "testTopic"
])
consumerContext = consumerApp.run()
}
private Class<?>[] getContextClasses(Class<?> mainContext, Class<?> additionalContext) {
def contextClasses = [mainContext]
if (additionalContext != null) {
contextClasses += additionalContext
}
contextClasses
}
def stopRabbit() {
rabbitMqContainer?.stop()
rabbitMqContainer = null
producerContext?.close()
producerContext = null
consumerContext?.close()
consumerContext = null
}
@SpringBootConfiguration
@EnableAutoConfiguration
@EnableBinding(Source)
static class ProducerConfig {
@Autowired
Source source
@Bean
Runnable producer() {
return {
runWithSpan("producer") {
source.output().send(MessageBuilder.withPayload("test").build())
}
}
}
}
@SpringBootConfiguration
@EnableAutoConfiguration
@EnableBinding(Sink)
static class ConsumerConfig {
@StreamListener(Sink.INPUT)
void consume(String ignored) {
runWithSpan("consumer") {}
}
}
}

View File

@ -0,0 +1,162 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.integration.v4_1;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.event.EventListener;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.ExecutorChannel;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.SubscribableChannel;
public abstract class AbstractComplexPropagationTest {
private final Class<?> additionalContextClass;
protected InstrumentationExtension testing;
ConfigurableApplicationContext applicationContext;
public AbstractComplexPropagationTest(
InstrumentationExtension testing, @Nullable Class<?> additionalContextClass) {
this.testing = testing;
this.additionalContextClass = additionalContextClass;
}
@BeforeEach
void setUp() {
List<Class<?>> contextClasses = new ArrayList<>();
contextClasses.add(ExternalQueueConfig.class);
if (additionalContextClass != null) {
contextClasses.add(additionalContextClass);
}
SpringApplication springApplication =
new SpringApplication(contextClasses.toArray(new Class<?>[0]));
springApplication.setDefaultProperties(
Collections.singletonMap("spring.main.web-application-type", "none"));
applicationContext = springApplication.run();
}
@AfterEach
void tearDown() {
if (applicationContext != null) {
applicationContext.close();
}
}
@Test
void shouldPropagateContextThroughAcomplexFlow() {
SubscribableChannel sendChannel =
applicationContext.getBean("sendChannel", SubscribableChannel.class);
SubscribableChannel receiveChannel =
applicationContext.getBean("receiveChannel", SubscribableChannel.class);
CapturingMessageHandler messageHandler = new CapturingMessageHandler();
receiveChannel.subscribe(messageHandler);
sendChannel.send(MessageBuilder.withPayload("test").setHeader("theAnswer", "42").build());
messageHandler.join();
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("application.sendChannel process").hasKind(SpanKind.CONSUMER),
span ->
span.hasName("application.receiveChannel process")
.hasParent(trace.getSpan(0))
.hasKind(SpanKind.CONSUMER),
span -> span.hasName("handler").hasParent(trace.getSpan(1))));
receiveChannel.unsubscribe(messageHandler);
}
// this setup simulates separate producer/consumer and some "external" message queue in between
@SpringBootConfiguration
@EnableAutoConfiguration
static class ExternalQueueConfig {
@Bean
SubscribableChannel sendChannel() {
return new ExecutorChannel(Executors.newSingleThreadExecutor());
}
@Bean
SubscribableChannel receiveChannel() {
return new DirectChannel();
}
@Bean
BlockingQueue<Payload> externalQueue() {
return new LinkedBlockingQueue<>();
}
@Bean(destroyMethod = "shutdownNow")
ExecutorService consumerThread() {
return Executors.newSingleThreadExecutor();
}
@EventListener(ApplicationReadyEvent.class)
void initialize() {
sendChannel().subscribe(message -> externalQueue().offer(Payload.from(message)));
consumerThread()
.execute(
() -> {
while (!Thread.interrupted()) {
try {
Payload payload = externalQueue().take();
receiveChannel().send(payload.toMessage());
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
});
}
}
static class Payload {
String body;
Map<String, String> headers;
Payload(String body, Map<String, String> headers) {
this.body = body;
this.headers = headers;
}
static Payload from(Message<?> message) {
String body = (String) message.getPayload();
Map<String, String> headers =
message.getHeaders().entrySet().stream()
.filter(kv -> kv.getValue() instanceof String)
.collect(Collectors.toMap(Map.Entry::getKey, kv -> (String) kv.getValue()));
return new Payload(body, headers);
}
Message<String> toMessage() {
return MessageBuilder.withPayload(body).copyHeaders(headers).build();
}
}
}

View File

@ -0,0 +1,53 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.integration.v4_1;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
public abstract class AbstractSpringCloudStreamProducerTest {
@RegisterExtension RabbitExtension rabbit;
protected final InstrumentationExtension testing;
private static final boolean HAS_PRODUCER_SPAN =
Boolean.getBoolean("otel.instrumentation.spring-integration.producer.enabled");
public AbstractSpringCloudStreamProducerTest(
InstrumentationExtension testing, Class<?> additionalContextClass) {
this.testing = testing;
rabbit = new RabbitExtension(additionalContextClass);
}
@Test
void hasProducerSpan() {
assumeTrue(HAS_PRODUCER_SPAN);
rabbit.getBean("producer", Runnable.class).run();
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("producer").hasKind(SpanKind.INTERNAL),
span ->
span.hasName("testProducer.output publish")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(0)),
span ->
span.hasName("testConsumer.input process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(1)),
span ->
span.hasName("consumer")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(2))));
}
}

View File

@ -0,0 +1,42 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.integration.v4_1;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
public abstract class AbstractSpringCloudStreamRabbitTest {
@RegisterExtension RabbitExtension rabbit;
protected final InstrumentationExtension testing;
public AbstractSpringCloudStreamRabbitTest(
InstrumentationExtension testing, Class<?> additionalContextClass) {
this.testing = testing;
rabbit = new RabbitExtension(additionalContextClass);
}
@Test
void shouldPropagateContextThroughRabbitMq() {
rabbit.getBean("producer", Runnable.class).run();
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("producer").hasKind(SpanKind.INTERNAL),
span ->
span.hasName("testConsumer.input process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(0)),
span ->
span.hasName("consumer")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(1))));
}
}

View File

@ -0,0 +1,257 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.integration.v4_1;
import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.event.EventListener;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.interceptor.GlobalChannelInterceptorWrapper;
import org.springframework.messaging.Message;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.ExecutorSubscribableChannel;
import org.springframework.messaging.support.MessageBuilder;
abstract class AbstractSpringIntegrationTracingTest {
protected final InstrumentationExtension testing;
private final Class<?> additionalContextClass;
ConfigurableApplicationContext applicationContext;
public AbstractSpringIntegrationTracingTest(
InstrumentationExtension testing, Class<?> additionalContextClass) {
this.testing = testing;
this.additionalContextClass = additionalContextClass;
}
@BeforeEach
public void setUp() {
List<Class<?>> contextClasses = new ArrayList<>();
contextClasses.add(MessageChannelsConfig.class);
if (additionalContextClass != null) {
contextClasses.add(additionalContextClass);
}
SpringApplication springApplication =
new SpringApplication(contextClasses.toArray(new Class<?>[0]));
springApplication.setDefaultProperties(
Collections.singletonMap("spring.main.web-application-type", "none"));
applicationContext = springApplication.run();
}
@AfterEach
public void tearDown() {
if (applicationContext != null) {
applicationContext.close();
}
}
@ParameterizedTest
@CsvSource(
value = {
"directChannel,application.directChannel process",
"executorChannel,executorChannel process"
},
delimiter = ',')
public void shouldPropagateContext(String channelName, String interceptorSpanName) {
SubscribableChannel channel =
applicationContext.getBean(channelName, SubscribableChannel.class);
CapturingMessageHandler messageHandler = new CapturingMessageHandler();
channel.subscribe(messageHandler);
channel.send(MessageBuilder.withPayload("test").build());
Message<?> capturedMessage = messageHandler.join();
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> {
span.hasName(interceptorSpanName).hasKind(SpanKind.CONSUMER);
verifyCorrectSpanWasPropagated(capturedMessage, trace.getSpan(0));
},
span -> span.hasName("handler").hasParent(trace.getSpan(0))));
channel.unsubscribe(messageHandler);
}
@Test
void shouldNotAddInterceptorTwice() {
SubscribableChannel channel =
applicationContext.getBean("directChannel1", SubscribableChannel.class);
CapturingMessageHandler messageHandler = new CapturingMessageHandler();
channel.subscribe(messageHandler);
channel.send(MessageBuilder.withPayload("test").build());
Message<?> capturedMessage = messageHandler.join();
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> {
span.hasName("application.directChannel2 process").hasKind(SpanKind.CONSUMER);
verifyCorrectSpanWasPropagated(capturedMessage, trace.getSpan(0));
},
span -> span.hasName("handler").hasParent(trace.getSpan(0))));
channel.unsubscribe(messageHandler);
}
@Test
void shouldNotCreateAspanWhenThereIsAlreadyAspanInTheContext() {
SubscribableChannel channel =
applicationContext.getBean("directChannel", SubscribableChannel.class);
CapturingMessageHandler messageHandler = new CapturingMessageHandler();
channel.subscribe(messageHandler);
testing.runWithSpan(
"parent",
() -> {
channel.send(MessageBuilder.withPayload("test").build());
});
messageHandler.join();
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent"),
span -> span.hasName("handler").hasParent(trace.getSpan(0))));
channel.unsubscribe(messageHandler);
}
@Test
void shouldHandleMultipleMessageChannelsInAchain() {
SubscribableChannel channel1 =
applicationContext.getBean("linkedChannel1", SubscribableChannel.class);
SubscribableChannel channel2 =
applicationContext.getBean("linkedChannel2", SubscribableChannel.class);
CapturingMessageHandler messageHandler = new CapturingMessageHandler();
channel2.subscribe(messageHandler);
channel1.send(MessageBuilder.withPayload("test").build());
Message<?> capturedMessage = messageHandler.join();
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> {
span.hasName("application.linkedChannel1 process").hasKind(SpanKind.CONSUMER);
verifyCorrectSpanWasPropagated(capturedMessage, trace.getSpan(0));
},
span -> span.hasName("handler").hasParent(trace.getSpan(0))));
channel2.unsubscribe(messageHandler);
}
@Test
void captureMessageHeader() {
SubscribableChannel channel =
applicationContext.getBean("directChannel", SubscribableChannel.class);
CapturingMessageHandler messageHandler = new CapturingMessageHandler();
channel.subscribe(messageHandler);
channel.send(
MessageBuilder.withPayload("test").setHeader("test-message-header", "test").build());
Message<?> capturedMessage = messageHandler.join();
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> {
span.hasName("application.directChannel process").hasKind(SpanKind.CONSUMER);
verifyCorrectSpanWasPropagated(capturedMessage, trace.getSpan(0));
},
span -> span.hasName("handler").hasParent(trace.getSpan(0))));
channel.unsubscribe(messageHandler);
}
static void verifyCorrectSpanWasPropagated(Message<?> capturedMessage, SpanData parentSpan) {
String propagatedSpan = (String) capturedMessage.getHeaders().get("traceparent");
assertThat(propagatedSpan).contains(parentSpan.getTraceId());
assertThat(propagatedSpan).contains(parentSpan.getSpanId());
}
@SpringBootConfiguration
@EnableAutoConfiguration
public static class MessageChannelsConfig {
SubscribableChannel problematicSharedChannel = new DirectChannel();
@Bean
public SubscribableChannel directChannel() {
return new DirectChannel();
}
@Bean
public SubscribableChannel directChannel1() {
return problematicSharedChannel;
}
@Bean
public SubscribableChannel directChannel2() {
return problematicSharedChannel;
}
@Bean
public SubscribableChannel executorChannel(GlobalChannelInterceptorWrapper otelInterceptor) {
ExecutorSubscribableChannel channel =
new ExecutorSubscribableChannel(Executors.newSingleThreadExecutor());
if (!Boolean.getBoolean("testLatestDeps")) {
// spring does not inject the interceptor in 4.1 because ExecutorSubscribableChannel isn't
// ChannelInterceptorAware
// in later versions spring injects the global interceptor into InterceptableChannel (which
// ExecutorSubscribableChannel is)
channel.addInterceptor(otelInterceptor.getChannelInterceptor());
}
return channel;
}
@Bean
public SubscribableChannel linkedChannel1() {
return new DirectChannel();
}
@Bean
public SubscribableChannel linkedChannel2() {
return new DirectChannel();
}
@EventListener(ApplicationReadyEvent.class)
public void initialize() {
linkedChannel1().subscribe(message -> linkedChannel2().send(message));
}
}
}

View File

@ -0,0 +1,29 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.integration.v4_1;
import static io.opentelemetry.instrumentation.testing.GlobalTraceUtil.runWithSpan;
import java.util.concurrent.CompletableFuture;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
class CapturingMessageHandler implements MessageHandler {
final CompletableFuture<Message<?>> captured = new CompletableFuture<>();
@Override
public void handleMessage(Message<?> message) {
runWithSpan(
"handler",
() -> {
captured.complete(message);
});
}
Message<?> join() {
return captured.join();
}
}

View File

@ -0,0 +1,135 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.integration.v4_1;
import static io.opentelemetry.instrumentation.testing.GlobalTraceUtil.runWithSpan;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.support.MessageBuilder;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
public class RabbitExtension implements BeforeEachCallback, AfterEachCallback {
private GenericContainer<?> rabbitMqContainer;
protected ConfigurableApplicationContext producerContext;
private ConfigurableApplicationContext consumerContext;
private final Class<?> additionalContextClass;
public RabbitExtension(Class<?> additionalContextClass) {
this.additionalContextClass = additionalContextClass;
}
public <T> T getBean(String name, Class<T> requiredType) {
return producerContext.getBean(name, requiredType);
}
@Override
public void beforeEach(ExtensionContext context) {
rabbitMqContainer =
new GenericContainer<>("rabbitmq:latest")
.withExposedPorts(5672)
.waitingFor(Wait.forLogMessage(".*Server startup complete.*", 1))
.withStartupTimeout(Duration.ofMinutes(2));
rabbitMqContainer.start();
SpringApplication producerApp =
new SpringApplication(getContextClasses(ProducerConfig.class, additionalContextClass));
Map<String, Object> producerProperties = new HashMap<>();
producerProperties.put("spring.application.name", "testProducer");
producerProperties.put("spring.jmx.enabled", false);
producerProperties.put("spring.main.web-application-type", "none");
producerProperties.put("spring.rabbitmq.host", rabbitMqContainer.getHost());
producerProperties.put("spring.rabbitmq.port", rabbitMqContainer.getMappedPort(5672));
producerProperties.put("spring.cloud.stream.bindings.output.destination", "testTopic");
producerApp.setDefaultProperties(producerProperties);
producerContext = producerApp.run();
SpringApplication consumerApp =
new SpringApplication(
getContextClasses(ProducerConfig.ConsumerConfig.class, additionalContextClass));
Map<String, Object> consumerProperties = new HashMap<>();
consumerProperties.put("spring.application.name", "testConsumer");
consumerProperties.put("spring.jmx.enabled", false);
consumerProperties.put("spring.main.web-application-type", "none");
consumerProperties.put("spring.rabbitmq.host", rabbitMqContainer.getHost());
consumerProperties.put("spring.rabbitmq.port", rabbitMqContainer.getMappedPort(5672));
consumerProperties.put("spring.cloud.stream.bindings.input.destination", "testTopic");
consumerApp.setDefaultProperties(consumerProperties);
consumerContext = consumerApp.run();
}
@Override
public void afterEach(ExtensionContext context) {
if (rabbitMqContainer != null) {
rabbitMqContainer.stop();
}
if (producerContext != null) {
producerContext.close();
}
if (consumerContext != null) {
consumerContext.close();
}
}
private static Class<?>[] getContextClasses(Class<?> mainContext, Class<?> additionalContext) {
List<Class<?>> contextClasses = new ArrayList<>();
contextClasses.add(mainContext);
if (additionalContext != null) {
contextClasses.add(additionalContext);
}
return contextClasses.toArray(new Class<?>[0]);
}
@SpringBootConfiguration
@EnableAutoConfiguration
@EnableBinding(Source.class)
static class ProducerConfig {
@Autowired Source source;
@Bean
Runnable producer() {
return () ->
runWithSpan(
"producer",
() -> {
source.output().send(MessageBuilder.withPayload("test").build());
});
}
@SpringBootConfiguration
@EnableAutoConfiguration
@EnableBinding(Sink.class)
static class ConsumerConfig {
@StreamListener(Sink.INPUT)
void consume(String ignored) {
runWithSpan(
"consumer",
() -> {
// do nothing
});
}
}
}
}