Add an option to disable automatic kafka interceptor configuration in spring starter (#12833)

This commit is contained in:
Lauri Tulmin 2024-12-07 05:10:27 +02:00 committed by GitHub
parent 81c7713bb2
commit c2c5d80f72
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 57 additions and 21 deletions

View File

@ -5,43 +5,55 @@
package io.opentelemetry.instrumentation.spring.autoconfigure.internal.instrumentation.kafka;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.spring.kafka.v2_7.SpringKafkaTelemetry;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
import org.springframework.beans.factory.ObjectProvider;
import java.lang.reflect.Field;
import java.util.function.Supplier;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.kafka.config.AbstractKafkaListenerContainerFactory;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.listener.BatchInterceptor;
import org.springframework.kafka.listener.RecordInterceptor;
class ConcurrentKafkaListenerContainerFactoryPostProcessor implements BeanPostProcessor {
private final ObjectProvider<OpenTelemetry> openTelemetryProvider;
private final ObjectProvider<ConfigProperties> configPropertiesProvider;
private final Supplier<SpringKafkaTelemetry> springKafkaTelemetry;
ConcurrentKafkaListenerContainerFactoryPostProcessor(
ObjectProvider<OpenTelemetry> openTelemetryProvider,
ObjectProvider<ConfigProperties> configPropertiesProvider) {
this.openTelemetryProvider = openTelemetryProvider;
this.configPropertiesProvider = configPropertiesProvider;
Supplier<SpringKafkaTelemetry> springKafkaTelemetry) {
this.springKafkaTelemetry = springKafkaTelemetry;
}
@SuppressWarnings("unchecked")
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
if (!(bean instanceof ConcurrentKafkaListenerContainerFactory)) {
return bean;
}
ConcurrentKafkaListenerContainerFactory<?, ?> listenerContainerFactory =
(ConcurrentKafkaListenerContainerFactory<?, ?>) bean;
SpringKafkaTelemetry springKafkaTelemetry =
SpringKafkaTelemetry.builder(openTelemetryProvider.getObject())
.setCaptureExperimentalSpanAttributes(
configPropertiesProvider
.getObject()
.getBoolean("otel.instrumentation.kafka.experimental-span-attributes", false))
.build();
listenerContainerFactory.setBatchInterceptor(springKafkaTelemetry.createBatchInterceptor());
listenerContainerFactory.setRecordInterceptor(springKafkaTelemetry.createRecordInterceptor());
ConcurrentKafkaListenerContainerFactory<Object, Object> listenerContainerFactory =
(ConcurrentKafkaListenerContainerFactory<Object, Object>) bean;
SpringKafkaTelemetry springKafkaTelemetry = this.springKafkaTelemetry.get();
// use reflection to read existing values to avoid overwriting user configured interceptors
BatchInterceptor<Object, Object> batchInterceptor =
readField(listenerContainerFactory, "batchInterceptor", BatchInterceptor.class);
RecordInterceptor<Object, Object> recordInterceptor =
readField(listenerContainerFactory, "recordInterceptor", RecordInterceptor.class);
listenerContainerFactory.setBatchInterceptor(
springKafkaTelemetry.createBatchInterceptor(batchInterceptor));
listenerContainerFactory.setRecordInterceptor(
springKafkaTelemetry.createRecordInterceptor(recordInterceptor));
return listenerContainerFactory;
}
private static <T> T readField(Object container, String filedName, Class<T> fieldType) {
try {
Field field = AbstractKafkaListenerContainerFactory.class.getDeclaredField(filedName);
field.setAccessible(true);
return fieldType.cast(field.get(container));
} catch (Exception exception) {
return null;
}
}
}

View File

@ -8,9 +8,11 @@ package io.opentelemetry.instrumentation.spring.autoconfigure.internal.instrumen
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.kafkaclients.v2_6.KafkaTelemetry;
import io.opentelemetry.instrumentation.spring.autoconfigure.internal.ConditionalOnEnabledInstrumentation;
import io.opentelemetry.instrumentation.spring.kafka.v2_7.SpringKafkaTelemetry;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -33,13 +35,29 @@ public class KafkaInstrumentationAutoConfiguration {
return producerFactory -> producerFactory.addPostProcessor(kafkaTelemetry::wrap);
}
@Bean
static SpringKafkaTelemetry getTelemetry(
ObjectProvider<OpenTelemetry> openTelemetryProvider,
ObjectProvider<ConfigProperties> configPropertiesProvider) {
return SpringKafkaTelemetry.builder(openTelemetryProvider.getObject())
.setCaptureExperimentalSpanAttributes(
configPropertiesProvider
.getObject()
.getBoolean("otel.instrumentation.kafka.experimental-span-attributes", false))
.build();
}
// static to avoid "is not eligible for getting processed by all BeanPostProcessors" warning
@Bean
@ConditionalOnProperty(
name = "otel.instrumentation.kafka.autoconfigure-interceptor",
havingValue = "true",
matchIfMissing = true)
static ConcurrentKafkaListenerContainerFactoryPostProcessor
otelKafkaListenerContainerFactoryBeanPostProcessor(
ObjectProvider<OpenTelemetry> openTelemetryProvider,
ObjectProvider<ConfigProperties> configPropertiesProvider) {
return new ConcurrentKafkaListenerContainerFactoryPostProcessor(
openTelemetryProvider, configPropertiesProvider);
() -> getTelemetry(openTelemetryProvider, configPropertiesProvider));
}
}

View File

@ -357,6 +357,12 @@
"description": "Enable the capture of experimental Kafka span attributes.",
"defaultValue": false
},
{
"name": "otel.instrumentation.kafka.autoconfigure-interceptor",
"type": "java.lang.Boolean",
"description": "Enable automatic configuration of tracing interceptors on <code>ConcurrentKafkaListenerContainerFactory</code> using a <code>BeanPostProcessor</code>. You may disable this if you wish to manually configure these interceptors.",
"defaultValue": true
},
{
"name": "otel.instrumentation.mongo.enabled",
"type": "java.lang.Boolean",