diff --git a/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/ConsumerConfig.java b/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/ConsumerConfig.java index f24070bc1b..4c97068197 100644 --- a/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/ConsumerConfig.java +++ b/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/ConsumerConfig.java @@ -5,6 +5,7 @@ package io.opentelemetry.testing; +import java.lang.reflect.Method; import org.apache.kafka.clients.admin.NewTopic; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.SpringBootConfiguration; @@ -15,6 +16,8 @@ import org.springframework.kafka.config.ContainerCustomizer; import org.springframework.kafka.config.TopicBuilder; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; +import org.springframework.util.backoff.BackOff; +import org.springframework.util.backoff.FixedBackOff; @SpringBootConfiguration @EnableAutoConfiguration @@ -50,7 +53,13 @@ public class ConsumerConfig { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); // do not retry failed records - factory.setBatchErrorHandler(new DoNothingBatchErrorHandler()); + try { + Class.forName("org.springframework.kafka.listener.BatchErrorHandler"); + ErrorHandlerSetter.setBatchErrorHandler(factory); + } catch (ClassNotFoundException ignored) { + // org.springframework.kafka.listener.BatchErrorHandler is missing in latest + setCommonErrorHandler(factory); + } factory.setConsumerFactory(consumerFactory); factory.setBatchListener(true); factory.setAutoStartup(true); @@ -68,11 +77,34 @@ public class ConsumerConfig { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); // do not retry failed records - factory.setErrorHandler(new DoNothingErrorHandler()); + try { + Class.forName("org.springframework.kafka.listener.ErrorHandler"); + ErrorHandlerSetter.setErrorHandler(factory); + } catch (ClassNotFoundException ignored) { + // org.springframework.kafka.listener.ErrorHandler is missing in latest + setCommonErrorHandler(factory); + } factory.setConsumerFactory(consumerFactory); factory.setBatchListener(false); factory.setAutoStartup(true); customizerProvider.ifAvailable(factory::setContainerCustomizer); return factory; } + + private static void setCommonErrorHandler( + ConcurrentKafkaListenerContainerFactory factory) { + try { + Class handlerClass = + Class.forName("org.springframework.kafka.listener.CommonErrorHandler"); + Class defaultHandlerClass = + Class.forName("org.springframework.kafka.listener.DefaultErrorHandler"); + BackOff backOff = new FixedBackOff(0, 0); + Object handler = + defaultHandlerClass.getDeclaredConstructor(BackOff.class).newInstance(backOff); + Method method = factory.getClass().getMethod("setCommonErrorHandler", handlerClass); + method.invoke(factory, handler); + } catch (Exception exception) { + // ignored + } + } } diff --git a/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/ErrorHandlerSetter.java b/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/ErrorHandlerSetter.java new file mode 100644 index 0000000000..4d2c97c42a --- /dev/null +++ b/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/ErrorHandlerSetter.java @@ -0,0 +1,24 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.testing; + +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; + +// Base classes for error handlers are missing in the latest version. Setter methods were extracted +// in ConsumerConfig to avoid verifier attempting to load these classes. +class ErrorHandlerSetter { + + private ErrorHandlerSetter() {} + + static void setBatchErrorHandler( + ConcurrentKafkaListenerContainerFactory factory) { + factory.setBatchErrorHandler(new DoNothingBatchErrorHandler()); + } + + static void setErrorHandler(ConcurrentKafkaListenerContainerFactory factory) { + factory.setErrorHandler(new DoNothingErrorHandler()); + } +}