Kafka: avoid registering duplicate metrics reporter (#8099)

Resolves
https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/8085
This commit is contained in:
Lauri Tulmin 2023-03-22 13:59:29 +02:00 committed by GitHub
parent 62c124f192
commit 39f7816572
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 66 additions and 1 deletions

View File

@ -17,6 +17,7 @@ import io.opentelemetry.javaagent.bootstrap.internal.DeprecatedConfigProperties;
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig; import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig; import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig;
import java.util.Map; import java.util.Map;
import java.util.regex.Pattern;
import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.clients.producer.RecordMetadata;
@ -33,6 +34,10 @@ public final class KafkaSingletons {
InstrumentationConfig.get() InstrumentationConfig.get()
.getBoolean("otel.instrumentation.kafka.metric-reporter.enabled", true); .getBoolean("otel.instrumentation.kafka.metric-reporter.enabled", true);
private static final Pattern METRIC_REPORTER_PRESENT_PATTERN =
Pattern.compile(
"(^|,)" + Pattern.quote(OpenTelemetryMetricsReporter.class.getName()) + "($|,)");
private static final Instrumenter<KafkaProducerRequest, RecordMetadata> PRODUCER_INSTRUMENTER; private static final Instrumenter<KafkaProducerRequest, RecordMetadata> PRODUCER_INSTRUMENTER;
private static final Instrumenter<KafkaReceiveRequest, Void> CONSUMER_RECEIVE_INSTRUMENTER; private static final Instrumenter<KafkaReceiveRequest, Void> CONSUMER_RECEIVE_INSTRUMENTER;
private static final Instrumenter<KafkaProcessRequest, Void> CONSUMER_PROCESS_INSTRUMENTER; private static final Instrumenter<KafkaProcessRequest, Void> CONSUMER_PROCESS_INSTRUMENTER;
@ -74,7 +79,13 @@ public final class KafkaSingletons {
config.merge( config.merge(
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
OpenTelemetryMetricsReporter.class.getName(), OpenTelemetryMetricsReporter.class.getName(),
(class1, class2) -> class1 + "," + class2); (class1, class2) -> {
if (class1 instanceof String
&& METRIC_REPORTER_PRESENT_PATTERN.matcher((String) class1).find()) {
return class1;
}
return class1 + "," + class2;
});
config.put( config.put(
OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER, OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER,
new OpenTelemetrySupplier(GlobalOpenTelemetry.get())); new OpenTelemetrySupplier(GlobalOpenTelemetry.get()));

View File

@ -33,7 +33,9 @@ import java.util.Set;
import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.KafkaConsumerAccess;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.KafkaProducerAccess;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.MetricName; import org.apache.kafka.common.MetricName;
@ -135,6 +137,22 @@ public abstract class AbstractOpenTelemetryMetricsReporterTest {
return consumerConfig; return consumerConfig;
} }
@Test
void noDuplicateMetricsReporter() {
List<MetricsReporter> producerMetricsReporters =
KafkaProducerAccess.getMetricsReporters(producer);
assertThat(countOpenTelemetryMetricsReporters(producerMetricsReporters)).isEqualTo(1);
List<MetricsReporter> consumerMetricsReporters =
KafkaConsumerAccess.getMetricsReporters(consumer);
assertThat(countOpenTelemetryMetricsReporters(consumerMetricsReporters)).isEqualTo(1);
}
private static long countOpenTelemetryMetricsReporters(List<MetricsReporter> metricsReporters) {
return metricsReporters.stream()
.filter(reporter -> reporter.getClass().getName().endsWith("OpenTelemetryMetricsReporter"))
.count();
}
@Test @Test
void observeMetrics() { void observeMetrics() {
produceRecords(); produceRecords();

View File

@ -0,0 +1,18 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package org.apache.kafka.clients.consumer;
import java.util.List;
import org.apache.kafka.common.metrics.MetricsReporter;
public class KafkaConsumerAccess {
private KafkaConsumerAccess() {}
public static List<MetricsReporter> getMetricsReporters(KafkaConsumer<?, ?> consumer) {
return consumer.metrics.reporters();
}
}

View File

@ -0,0 +1,18 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package org.apache.kafka.clients.producer;
import java.util.List;
import org.apache.kafka.common.metrics.MetricsReporter;
public class KafkaProducerAccess {
private KafkaProducerAccess() {}
public static List<MetricsReporter> getMetricsReporters(KafkaProducer<?, ?> producer) {
return producer.metrics.reporters();
}
}