diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/DoNothingBatchErrorHandler.groovy b/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/DoNothingBatchErrorHandler.groovy new file mode 100644 index 0000000000..6a113388fd --- /dev/null +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/DoNothingBatchErrorHandler.groovy @@ -0,0 +1,13 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import org.apache.kafka.clients.consumer.ConsumerRecords +import org.springframework.kafka.listener.BatchErrorHandler + +class DoNothingBatchErrorHandler implements BatchErrorHandler { + @Override + void handle(Exception thrownException, ConsumerRecords data) { + } +} diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/SpringKafkaInstrumentationTest.groovy b/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/SpringKafkaInstrumentationTest.groovy index db470e19a6..a9113200d6 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/SpringKafkaInstrumentationTest.groovy +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/SpringKafkaInstrumentationTest.groovy @@ -38,11 +38,14 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification { def app = new SpringApplication(ConsumerConfig) app.setDefaultProperties([ - "spring.jmx.enabled" : false, - "spring.main.web-application-type" : "none", - "spring.kafka.bootstrap-servers" : kafka.bootstrapServers, - "spring.kafka.consumer.auto-offset-reset": "earliest", - "spring.kafka.consumer.linger-ms" : 10, + "spring.jmx.enabled" : false, + "spring.main.web-application-type" : "none", + "spring.kafka.bootstrap-servers" : kafka.bootstrapServers, + "spring.kafka.consumer.auto-offset-reset" : "earliest", + "spring.kafka.consumer.linger-ms" : 10, + // wait 1s between poll() calls + "spring.kafka.listener.idle-between-polls" : 1000, + "spring.kafka.producer.transaction-id-prefix": "test-", ]) applicationContext = app.run() } @@ -58,9 +61,11 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification { when: runWithSpan("producer") { - kafkaTemplate.send("testTopic", "10", "testSpan1") - kafkaTemplate.send("testTopic", "20", "testSpan2") - kafkaTemplate.flush() + // wrapping in a transaction is needed to remove the possibility of messages being picked up separately by the consumer + kafkaTemplate.executeInTransaction({ ops -> + ops.send("testTopic", "10", "testSpan1") + ops.send("testTopic", "20", "testSpan2") + }) } then: @@ -152,8 +157,9 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification { when: runWithSpan("producer") { - kafkaTemplate.send("testTopic", "10", "error") - kafkaTemplate.flush() + kafkaTemplate.executeInTransaction({ ops -> + ops.send("testTopic", "10", "error") + }) } then: @@ -240,6 +246,8 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification { ConcurrentKafkaListenerContainerFactory batchFactory( ConsumerFactory consumerFactory) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>() + // do not retry failed records + factory.setBatchErrorHandler(new DoNothingBatchErrorHandler()) factory.setConsumerFactory(consumerFactory) factory.setBatchListener(true) factory.setAutoStartup(true)