From 01ea967d67746e022910feb7fdf2ba2e968cb40c Mon Sep 17 00:00:00 2001 From: Mateusz Rzeszutek Date: Tue, 31 Aug 2021 00:04:16 +0200 Subject: [PATCH] Use kafka transactions to make spring-kafka tests more stable (#4024) * Use kafka transactions to make spring-kafka tests more stable * manual acks * another approach: batch error handler that immediately recovers * another try * yet another try * do nothing error handler * spotless --- .../groovy/DoNothingBatchErrorHandler.groovy | 13 +++++++++ .../SpringKafkaInstrumentationTest.groovy | 28 ++++++++++++------- 2 files changed, 31 insertions(+), 10 deletions(-) create mode 100644 instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/DoNothingBatchErrorHandler.groovy diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/DoNothingBatchErrorHandler.groovy b/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/DoNothingBatchErrorHandler.groovy new file mode 100644 index 0000000000..6a113388fd --- /dev/null +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/DoNothingBatchErrorHandler.groovy @@ -0,0 +1,13 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import org.apache.kafka.clients.consumer.ConsumerRecords +import org.springframework.kafka.listener.BatchErrorHandler + +class DoNothingBatchErrorHandler implements BatchErrorHandler { + @Override + void handle(Exception thrownException, ConsumerRecords data) { + } +} diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/SpringKafkaInstrumentationTest.groovy b/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/SpringKafkaInstrumentationTest.groovy index db470e19a6..a9113200d6 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/SpringKafkaInstrumentationTest.groovy +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/SpringKafkaInstrumentationTest.groovy @@ -38,11 +38,14 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification { def app = new SpringApplication(ConsumerConfig) app.setDefaultProperties([ - "spring.jmx.enabled" : false, - "spring.main.web-application-type" : "none", - "spring.kafka.bootstrap-servers" : kafka.bootstrapServers, - "spring.kafka.consumer.auto-offset-reset": "earliest", - "spring.kafka.consumer.linger-ms" : 10, + "spring.jmx.enabled" : false, + "spring.main.web-application-type" : "none", + "spring.kafka.bootstrap-servers" : kafka.bootstrapServers, + "spring.kafka.consumer.auto-offset-reset" : "earliest", + "spring.kafka.consumer.linger-ms" : 10, + // wait 1s between poll() calls + "spring.kafka.listener.idle-between-polls" : 1000, + "spring.kafka.producer.transaction-id-prefix": "test-", ]) applicationContext = app.run() } @@ -58,9 +61,11 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification { when: runWithSpan("producer") { - kafkaTemplate.send("testTopic", "10", "testSpan1") - kafkaTemplate.send("testTopic", "20", "testSpan2") - kafkaTemplate.flush() + // wrapping in a transaction is needed to remove the possibility of messages being picked up separately by the consumer + kafkaTemplate.executeInTransaction({ ops -> + ops.send("testTopic", "10", "testSpan1") + ops.send("testTopic", "20", "testSpan2") + }) } then: @@ -152,8 +157,9 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification { when: runWithSpan("producer") { - kafkaTemplate.send("testTopic", "10", "error") - kafkaTemplate.flush() + kafkaTemplate.executeInTransaction({ ops -> + ops.send("testTopic", "10", "error") + }) } then: @@ -240,6 +246,8 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification { ConcurrentKafkaListenerContainerFactory batchFactory( ConsumerFactory consumerFactory) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>() + // do not retry failed records + factory.setBatchErrorHandler(new DoNothingBatchErrorHandler()) factory.setConsumerFactory(consumerFactory) factory.setBatchListener(true) factory.setAutoStartup(true)