Handle kafka METRIC_REPORTER_CLASSES_CONFIG being set to a List (#9155)

This commit is contained in:
Lauri Tulmin 2023-08-09 14:48:58 +03:00 committed by GitHub
parent 60296bbc72
commit 54d0e0718f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 45 additions and 10 deletions

View File

@ -16,8 +16,9 @@ import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetrySupplier;
import io.opentelemetry.javaagent.bootstrap.internal.DeprecatedConfigProperties; import io.opentelemetry.javaagent.bootstrap.internal.DeprecatedConfigProperties;
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig; import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig; import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig;
import java.util.ArrayList;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.regex.Pattern;
import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.clients.producer.RecordMetadata;
@ -34,10 +35,6 @@ public final class KafkaSingletons {
InstrumentationConfig.get() InstrumentationConfig.get()
.getBoolean("otel.instrumentation.kafka.metric-reporter.enabled", true); .getBoolean("otel.instrumentation.kafka.metric-reporter.enabled", true);
private static final Pattern METRIC_REPORTER_PRESENT_PATTERN =
Pattern.compile(
"(^|,)" + Pattern.quote(OpenTelemetryMetricsReporter.class.getName()) + "($|,)");
private static final Instrumenter<KafkaProducerRequest, RecordMetadata> PRODUCER_INSTRUMENTER; private static final Instrumenter<KafkaProducerRequest, RecordMetadata> PRODUCER_INSTRUMENTER;
private static final Instrumenter<KafkaReceiveRequest, Void> CONSUMER_RECEIVE_INSTRUMENTER; private static final Instrumenter<KafkaReceiveRequest, Void> CONSUMER_RECEIVE_INSTRUMENTER;
private static final Instrumenter<KafkaProcessRequest, Void> CONSUMER_PROCESS_INSTRUMENTER; private static final Instrumenter<KafkaProcessRequest, Void> CONSUMER_PROCESS_INSTRUMENTER;
@ -72,22 +69,29 @@ public final class KafkaSingletons {
return CONSUMER_PROCESS_INSTRUMENTER; return CONSUMER_PROCESS_INSTRUMENTER;
} }
@SuppressWarnings("unchecked")
public static void enhanceConfig(Map<? super String, Object> config) { public static void enhanceConfig(Map<? super String, Object> config) {
if (!METRICS_ENABLED) { // skip enhancing configuration when metrics are disabled or when we have already enhanced it
if (!METRICS_ENABLED
|| config.get(OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTRUMENTATION_NAME)
!= null) {
return; return;
} }
config.merge( config.merge(
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
OpenTelemetryMetricsReporter.class.getName(), OpenTelemetryMetricsReporter.class.getName(),
(class1, class2) -> { (class1, class2) -> {
if (class1 instanceof String) { // class1 is either a class name or List of class names or classes
if (class1 instanceof List) {
List<Object> result = new ArrayList<>();
result.addAll((List<Object>) class1);
result.add(class2);
return result;
} else if (class1 instanceof String) {
String className1 = (String) class1; String className1 = (String) class1;
if (className1.isEmpty()) { if (className1.isEmpty()) {
return class2; return class2;
} }
if (METRIC_REPORTER_PRESENT_PATTERN.matcher(className1).find()) {
return class1;
}
} }
return class1 + "," + class2; return class1 + "," + class2;
}); });

View File

@ -10,6 +10,7 @@ import static java.util.Collections.emptyMap;
import io.opentelemetry.instrumentation.kafka.internal.AbstractOpenTelemetryMetricsReporterTest; import io.opentelemetry.instrumentation.kafka.internal.AbstractOpenTelemetryMetricsReporterTest;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import java.util.Collections;
import java.util.Map; import java.util.Map;
import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.KafkaConsumer;
@ -48,4 +49,34 @@ class OpenTelemetryMetricsReporterTest extends AbstractOpenTelemetryMetricsRepor
producerConfig.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, ""); producerConfig.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, "");
new KafkaProducer<>(producerConfig).close(); new KafkaProducer<>(producerConfig).close();
} }
@Test
void classListMetricsReporter() {
Map<String, Object> consumerConfig = consumerConfig();
consumerConfig.put(
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
Collections.singletonList(TestMetricsReporter.class));
new KafkaConsumer<>(consumerConfig).close();
Map<String, Object> producerConfig = producerConfig();
producerConfig.put(
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
Collections.singletonList(TestMetricsReporter.class));
new KafkaProducer<>(producerConfig).close();
}
@Test
void stringListMetricsReporter() {
Map<String, Object> consumerConfig = consumerConfig();
consumerConfig.put(
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
Collections.singletonList(TestMetricsReporter.class.getName()));
new KafkaConsumer<>(consumerConfig).close();
Map<String, Object> producerConfig = producerConfig();
producerConfig.put(
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
Collections.singletonList(TestMetricsReporter.class.getName()));
new KafkaProducer<>(producerConfig).close();
}
} }