Fix spring-kafka-2.7 latest dep tests (#9921)

This commit is contained in:
Lauri Tulmin 2023-11-21 12:59:18 +02:00 committed by GitHub
parent f6b63dac7b
commit e148b84536
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 58 additions and 2 deletions

View File

@ -5,6 +5,7 @@
package io.opentelemetry.testing;
import java.lang.reflect.Method;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.SpringBootConfiguration;
@ -15,6 +16,8 @@ import org.springframework.kafka.config.ContainerCustomizer;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;
@SpringBootConfiguration
@EnableAutoConfiguration
@ -50,7 +53,13 @@ public class ConsumerConfig {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
// do not retry failed records
factory.setBatchErrorHandler(new DoNothingBatchErrorHandler());
try {
Class.forName("org.springframework.kafka.listener.BatchErrorHandler");
ErrorHandlerSetter.setBatchErrorHandler(factory);
} catch (ClassNotFoundException ignored) {
// org.springframework.kafka.listener.BatchErrorHandler is missing in latest
setCommonErrorHandler(factory);
}
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true);
factory.setAutoStartup(true);
@ -68,11 +77,34 @@ public class ConsumerConfig {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
// do not retry failed records
factory.setErrorHandler(new DoNothingErrorHandler());
try {
Class.forName("org.springframework.kafka.listener.ErrorHandler");
ErrorHandlerSetter.setErrorHandler(factory);
} catch (ClassNotFoundException ignored) {
// org.springframework.kafka.listener.ErrorHandler is missing in latest
setCommonErrorHandler(factory);
}
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(false);
factory.setAutoStartup(true);
customizerProvider.ifAvailable(factory::setContainerCustomizer);
return factory;
}
private static void setCommonErrorHandler(
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
try {
Class<?> handlerClass =
Class.forName("org.springframework.kafka.listener.CommonErrorHandler");
Class<?> defaultHandlerClass =
Class.forName("org.springframework.kafka.listener.DefaultErrorHandler");
BackOff backOff = new FixedBackOff(0, 0);
Object handler =
defaultHandlerClass.getDeclaredConstructor(BackOff.class).newInstance(backOff);
Method method = factory.getClass().getMethod("setCommonErrorHandler", handlerClass);
method.invoke(factory, handler);
} catch (Exception exception) {
// ignored
}
}
}

View File

@ -0,0 +1,24 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.testing;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
// Base classes for error handlers are missing in the latest version. Setter methods were extracted
// in ConsumerConfig to avoid verifier attempting to load these classes.
class ErrorHandlerSetter {
private ErrorHandlerSetter() {}
static void setBatchErrorHandler(
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
factory.setBatchErrorHandler(new DoNothingBatchErrorHandler());
}
static void setErrorHandler(ConcurrentKafkaListenerContainerFactory<String, String> factory) {
factory.setErrorHandler(new DoNothingErrorHandler());
}
}