Don't dynamically change config mid-Kafka test (#1797)

* Don't dynamically change config mid-Kafka test

* Codenarc

* Spotless
This commit is contained in:
Trask Stalnaker 2020-11-29 23:14:11 -08:00 committed by GitHub
parent 573cd0b4e9
commit e92bdb46f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 236 additions and 210 deletions

View File

@ -14,7 +14,7 @@ class ConfiguredTraceAnnotationsTest extends AgentTestRunner {
"package.Class\$Name;${OuterClass.InterestingMethod.name}")
}
def specCleanup() {
def cleanupSpec() {
ConfigUtils.setConfig(PREVIOUS_CONFIG)
}

View File

@ -0,0 +1,82 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.test.AgentTestRunner
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.junit.Rule
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.listener.KafkaMessageListenerContainer
import org.springframework.kafka.listener.MessageListener
import org.springframework.kafka.test.rule.KafkaEmbedded
import org.springframework.kafka.test.utils.ContainerTestUtils
import org.springframework.kafka.test.utils.KafkaTestUtils
import spock.lang.Unroll
abstract class KafkaClientBaseTest extends AgentTestRunner {
protected static final SHARED_TOPIC = "shared.topic"
protected isPropagationEnabled() {
return true
}
@Rule
KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SHARED_TOPIC)
@Unroll
def "test kafka client header propagation manual config"() {
setup:
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
def producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps)
def kafkaTemplate = new KafkaTemplate<String, String>(producerFactory)
// set up the Kafka consumer properties
def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka)
// create a Kafka consumer factory
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)
// set the topic that needs to be consumed
def containerProperties = containerProperties()
// create a Kafka MessageListenerContainer
def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties)
// create a thread safe queue to store the received message
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()
// setup a Kafka message listener
container.setupMessageListener(new MessageListener<String, String>() {
@Override
void onMessage(ConsumerRecord<String, String> record) {
records.add(record)
}
})
// start the container and underlying message listener
container.start()
// wait until the container has the required number of assigned partitions
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic())
when:
String message = "Testing without headers"
kafkaTemplate.send(SHARED_TOPIC, message)
then:
// check that the message was received
def received = records.poll(5, TimeUnit.SECONDS)
received.headers().iterator().hasNext() == isPropagationEnabled()
cleanup:
producerFactory.stop()
container?.stop()
}
}

View File

@ -0,0 +1,152 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import static io.opentelemetry.api.trace.Span.Kind.CONSUMER
import static io.opentelemetry.api.trace.Span.Kind.PRODUCER
import io.opentelemetry.api.trace.attributes.SemanticAttributes
import io.opentelemetry.instrumentation.test.utils.ConfigUtils
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.listener.KafkaMessageListenerContainer
import org.springframework.kafka.listener.MessageListener
import org.springframework.kafka.test.utils.ContainerTestUtils
import org.springframework.kafka.test.utils.KafkaTestUtils
class KafkaClientPropagationDisabledTest extends KafkaClientBaseTest {
static final PREVIOUS_CONFIG = ConfigUtils.updateConfigAndResetInstrumentation {
it.setProperty("otel.instrumentation.kafka.client-propagation", "false")
}
def cleanupSpec() {
ConfigUtils.setConfig(PREVIOUS_CONFIG)
}
@Override
protected isPropagationEnabled() {
return false
}
def "should not read remote context when consuming messages if propagation is disabled"() {
setup:
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
def producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps)
def kafkaTemplate = new KafkaTemplate<String, String>(producerFactory)
when: "send message"
String message = "Testing without headers"
kafkaTemplate.send(SHARED_TOPIC, message)
then: "producer span is created"
assertTraces(1) {
trace(0, 1) {
span(0) {
name SHARED_TOPIC + " send"
kind PRODUCER
errored false
hasNoParent()
attributes {
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
}
}
}
}
when: "read message without context propagation"
// create a thread safe queue to store the received message
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()
KafkaMessageListenerContainer<Object, Object> container = startConsumer("consumer-without-propagation", records)
then: "independent consumer span is created"
// check that the message was received
records.poll(5, TimeUnit.SECONDS) != null
assertTraces(2) {
trace(0, 1) {
span(0) {
name SHARED_TOPIC + " send"
kind PRODUCER
errored false
hasNoParent()
attributes {
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
}
}
}
trace(1, 1) {
span(0) {
name SHARED_TOPIC + " process"
kind CONSUMER
errored false
hasNoParent()
attributes {
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
"partition" { it >= 0 }
"offset" 0
"record.queue_time_ms" { it >= 0 }
}
}
}
}
cleanup:
producerFactory.stop()
container?.stop()
}
protected KafkaMessageListenerContainer<Object, Object> startConsumer(String groupId, records) {
// set up the Kafka consumer properties
Map<String, Object> consumerProperties = KafkaTestUtils.consumerProps(groupId, "false", embeddedKafka)
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
// create a Kafka consumer factory
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)
// set the topic that needs to be consumed
def containerProperties = containerProperties()
// create a Kafka MessageListenerContainer
def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties)
// setup a Kafka message listener
container.setupMessageListener(new MessageListener<String, String>() {
@Override
void onMessage(ConsumerRecord<String, String> record) {
records.add(record)
}
})
// start the container and underlying message listener
container.start()
// wait until the container has the required number of assigned partitions
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic())
container
}
def containerProperties() {
try {
// Different class names for test and latestDepTest.
return Class.forName("org.springframework.kafka.listener.config.ContainerProperties").newInstance(SHARED_TOPIC)
} catch (ClassNotFoundException | NoClassDefFoundError e) {
return Class.forName("org.springframework.kafka.listener.ContainerProperties").newInstance(SHARED_TOPIC)
}
}
}

View File

@ -5,14 +5,10 @@
import static io.opentelemetry.api.trace.Span.Kind.CONSUMER
import static io.opentelemetry.api.trace.Span.Kind.PRODUCER
import static io.opentelemetry.instrumentation.test.utils.ConfigUtils.setConfig
import static io.opentelemetry.instrumentation.test.utils.ConfigUtils.updateConfig
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicSpan
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace
import io.opentelemetry.api.trace.attributes.SemanticAttributes
import io.opentelemetry.instrumentation.api.config.Config
import io.opentelemetry.instrumentation.test.AgentTestRunner
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
import org.apache.kafka.clients.consumer.ConsumerConfig
@ -23,22 +19,15 @@ import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringSerializer
import org.junit.Rule
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.listener.KafkaMessageListenerContainer
import org.springframework.kafka.listener.MessageListener
import org.springframework.kafka.test.rule.KafkaEmbedded
import org.springframework.kafka.test.utils.ContainerTestUtils
import org.springframework.kafka.test.utils.KafkaTestUtils
import spock.lang.Unroll
class KafkaClientTest extends AgentTestRunner {
static final SHARED_TOPIC = "shared.topic"
@Rule
KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SHARED_TOPIC)
class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {
def "test kafka produce and consume"() {
setup:
@ -380,197 +369,6 @@ class KafkaClientTest extends AgentTestRunner {
producer.close()
}
@Unroll
def "test kafka client header propagation manual config"() {
setup:
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
def producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps)
def kafkaTemplate = new KafkaTemplate<String, String>(producerFactory)
// set up the Kafka consumer properties
def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka)
// create a Kafka consumer factory
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)
// set the topic that needs to be consumed
def containerProperties = containerProperties()
// create a Kafka MessageListenerContainer
def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties)
// create a thread safe queue to store the received message
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()
// setup a Kafka message listener
container.setupMessageListener(new MessageListener<String, String>() {
@Override
void onMessage(ConsumerRecord<String, String> record) {
records.add(record)
}
})
// start the container and underlying message listener
container.start()
// wait until the container has the required number of assigned partitions
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic())
when:
String message = "Testing without headers"
def previousConfig = setPropagation(propagationEnabled)
kafkaTemplate.send(SHARED_TOPIC, message)
setConfig(previousConfig)
then:
// check that the message was received
def received = records.poll(5, TimeUnit.SECONDS)
received.headers().iterator().hasNext() == propagationEnabled
cleanup:
producerFactory.stop()
container?.stop()
where:
propagationEnabled << [false, true]
}
def "should not read remote context when consuming messages if propagation is disabled"() {
setup:
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
def producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps)
def kafkaTemplate = new KafkaTemplate<String, String>(producerFactory)
when: "send message"
String message = "Testing without headers"
kafkaTemplate.send(SHARED_TOPIC, message)
then: "producer span is created"
assertTraces(1) {
trace(0, 1) {
span(0) {
name SHARED_TOPIC + " send"
kind PRODUCER
errored false
hasNoParent()
attributes {
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
}
}
}
}
when: "read message with context propagation"
// create a thread safe queue to store the received message
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()
KafkaMessageListenerContainer<Object, Object> container = startConsumer("consumer-with-propagation", records)
then: "consumer span is created and is a child of the producer"
// check that the message was received
records.poll(5, TimeUnit.SECONDS) != null
assertTraces(1) {
trace(0, 2) {
span(0) {
name SHARED_TOPIC + " send"
kind PRODUCER
errored false
hasNoParent()
attributes {
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
}
}
span(1) {
name SHARED_TOPIC + " process"
kind CONSUMER
errored false
childOf span(0)
attributes {
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
"partition" { it >= 0 }
"offset" 0
"record.queue_time_ms" { it >= 0 }
}
}
}
}
container.stop()
when: "read message without context propagation"
def previousConfig = setPropagation(false)
records.clear()
container = startConsumer("consumer-without-propagation", records)
then: "independent consumer span is created"
// check that the message was received
records.poll(5, TimeUnit.SECONDS) != null
assertTraces(2) {
trace(0, 2) {
span(0) {
name SHARED_TOPIC + " send"
kind PRODUCER
errored false
hasNoParent()
attributes {
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
}
}
span(1) {
name SHARED_TOPIC + " process"
kind CONSUMER
errored false
childOf span(0)
attributes {
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
"partition" { it >= 0 }
"offset" 0
"record.queue_time_ms" { it >= 0 }
}
}
}
trace(1, 1) {
span(0) {
name SHARED_TOPIC + " process"
kind CONSUMER
errored false
hasNoParent()
attributes {
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
"partition" { it >= 0 }
"offset" 0
"record.queue_time_ms" { it >= 0 }
}
}
}
}
cleanup:
producerFactory.stop()
container?.stop()
setConfig(previousConfig)
}
protected KafkaMessageListenerContainer<Object, Object> startConsumer(String groupId, records) {
// set up the Kafka consumer properties
Map<String, Object> consumerProperties = KafkaTestUtils.consumerProps(groupId, "false", embeddedKafka)
@ -610,10 +408,4 @@ class KafkaClientTest extends AgentTestRunner {
return Class.forName("org.springframework.kafka.listener.ContainerProperties").newInstance(SHARED_TOPIC)
}
}
private static Config setPropagation(boolean propagationEnabled) {
return updateConfig {
it.setProperty("otel.instrumentation.kafka.client-propagation", Boolean.toString(propagationEnabled))
}
}
}