Use kafka transactions to make spring-kafka tests more stable (#4024)
* Use kafka transactions to make spring-kafka tests more stable * manual acks * another approach: batch error handler that immediately recovers * another try * yet another try * do nothing error handler * spotless
This commit is contained in:
parent
0f9308b4fb
commit
01ea967d67
|
@ -0,0 +1,13 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords
|
||||
import org.springframework.kafka.listener.BatchErrorHandler
|
||||
|
||||
class DoNothingBatchErrorHandler implements BatchErrorHandler {
|
||||
@Override
|
||||
void handle(Exception thrownException, ConsumerRecords<?, ?> data) {
|
||||
}
|
||||
}
|
|
@ -38,11 +38,14 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification {
|
|||
|
||||
def app = new SpringApplication(ConsumerConfig)
|
||||
app.setDefaultProperties([
|
||||
"spring.jmx.enabled" : false,
|
||||
"spring.main.web-application-type" : "none",
|
||||
"spring.kafka.bootstrap-servers" : kafka.bootstrapServers,
|
||||
"spring.kafka.consumer.auto-offset-reset": "earliest",
|
||||
"spring.kafka.consumer.linger-ms" : 10,
|
||||
"spring.jmx.enabled" : false,
|
||||
"spring.main.web-application-type" : "none",
|
||||
"spring.kafka.bootstrap-servers" : kafka.bootstrapServers,
|
||||
"spring.kafka.consumer.auto-offset-reset" : "earliest",
|
||||
"spring.kafka.consumer.linger-ms" : 10,
|
||||
// wait 1s between poll() calls
|
||||
"spring.kafka.listener.idle-between-polls" : 1000,
|
||||
"spring.kafka.producer.transaction-id-prefix": "test-",
|
||||
])
|
||||
applicationContext = app.run()
|
||||
}
|
||||
|
@ -58,9 +61,11 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification {
|
|||
|
||||
when:
|
||||
runWithSpan("producer") {
|
||||
kafkaTemplate.send("testTopic", "10", "testSpan1")
|
||||
kafkaTemplate.send("testTopic", "20", "testSpan2")
|
||||
kafkaTemplate.flush()
|
||||
// wrapping in a transaction is needed to remove the possibility of messages being picked up separately by the consumer
|
||||
kafkaTemplate.executeInTransaction({ ops ->
|
||||
ops.send("testTopic", "10", "testSpan1")
|
||||
ops.send("testTopic", "20", "testSpan2")
|
||||
})
|
||||
}
|
||||
|
||||
then:
|
||||
|
@ -152,8 +157,9 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification {
|
|||
|
||||
when:
|
||||
runWithSpan("producer") {
|
||||
kafkaTemplate.send("testTopic", "10", "error")
|
||||
kafkaTemplate.flush()
|
||||
kafkaTemplate.executeInTransaction({ ops ->
|
||||
ops.send("testTopic", "10", "error")
|
||||
})
|
||||
}
|
||||
|
||||
then:
|
||||
|
@ -240,6 +246,8 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification {
|
|||
ConcurrentKafkaListenerContainerFactory<String, String> batchFactory(
|
||||
ConsumerFactory<String, String> consumerFactory) {
|
||||
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>()
|
||||
// do not retry failed records
|
||||
factory.setBatchErrorHandler(new DoNothingBatchErrorHandler())
|
||||
factory.setConsumerFactory(consumerFactory)
|
||||
factory.setBatchListener(true)
|
||||
factory.setAutoStartup(true)
|
||||
|
|
Loading…
Reference in New Issue