From 39f78165728a1906b97075f2381882217f124347 Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Wed, 22 Mar 2023 13:59:29 +0200 Subject: [PATCH] Kafka: avoid registering duplicate metrics reporter (#8099) Resolves https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/8085 --- .../kafkaclients/v0_11/KafkaSingletons.java | 13 ++++++++++++- ...stractOpenTelemetryMetricsReporterTest.java | 18 ++++++++++++++++++ .../clients/consumer/KafkaConsumerAccess.java | 18 ++++++++++++++++++ .../clients/producer/KafkaProducerAccess.java | 18 ++++++++++++++++++ 4 files changed, 66 insertions(+), 1 deletion(-) create mode 100644 instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumerAccess.java create mode 100644 instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/org/apache/kafka/clients/producer/KafkaProducerAccess.java diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaSingletons.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaSingletons.java index 27bc234545..3eb5f020e5 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaSingletons.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaSingletons.java @@ -17,6 +17,7 @@ import io.opentelemetry.javaagent.bootstrap.internal.DeprecatedConfigProperties; import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig; import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig; import java.util.Map; +import java.util.regex.Pattern; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.RecordMetadata; @@ -33,6 +34,10 @@ public final class KafkaSingletons { InstrumentationConfig.get() .getBoolean("otel.instrumentation.kafka.metric-reporter.enabled", true); + private static final Pattern METRIC_REPORTER_PRESENT_PATTERN = + Pattern.compile( + "(^|,)" + Pattern.quote(OpenTelemetryMetricsReporter.class.getName()) + "($|,)"); + private static final Instrumenter PRODUCER_INSTRUMENTER; private static final Instrumenter CONSUMER_RECEIVE_INSTRUMENTER; private static final Instrumenter CONSUMER_PROCESS_INSTRUMENTER; @@ -74,7 +79,13 @@ public final class KafkaSingletons { config.merge( CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, OpenTelemetryMetricsReporter.class.getName(), - (class1, class2) -> class1 + "," + class2); + (class1, class2) -> { + if (class1 instanceof String + && METRIC_REPORTER_PRESENT_PATTERN.matcher((String) class1).find()) { + return class1; + } + return class1 + "," + class2; + }); config.put( OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER, new OpenTelemetrySupplier(GlobalOpenTelemetry.get())); diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafka/internal/AbstractOpenTelemetryMetricsReporterTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafka/internal/AbstractOpenTelemetryMetricsReporterTest.java index f22bd0cec3..79741f4d24 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafka/internal/AbstractOpenTelemetryMetricsReporterTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafka/internal/AbstractOpenTelemetryMetricsReporterTest.java @@ -33,7 +33,9 @@ import java.util.Set; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.KafkaConsumerAccess; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.KafkaProducerAccess; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.MetricName; @@ -135,6 +137,22 @@ public abstract class AbstractOpenTelemetryMetricsReporterTest { return consumerConfig; } + @Test + void noDuplicateMetricsReporter() { + List producerMetricsReporters = + KafkaProducerAccess.getMetricsReporters(producer); + assertThat(countOpenTelemetryMetricsReporters(producerMetricsReporters)).isEqualTo(1); + List consumerMetricsReporters = + KafkaConsumerAccess.getMetricsReporters(consumer); + assertThat(countOpenTelemetryMetricsReporters(consumerMetricsReporters)).isEqualTo(1); + } + + private static long countOpenTelemetryMetricsReporters(List metricsReporters) { + return metricsReporters.stream() + .filter(reporter -> reporter.getClass().getName().endsWith("OpenTelemetryMetricsReporter")) + .count(); + } + @Test void observeMetrics() { produceRecords(); diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumerAccess.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumerAccess.java new file mode 100644 index 0000000000..2b698af5dd --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumerAccess.java @@ -0,0 +1,18 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.kafka.clients.consumer; + +import java.util.List; +import org.apache.kafka.common.metrics.MetricsReporter; + +public class KafkaConsumerAccess { + + private KafkaConsumerAccess() {} + + public static List getMetricsReporters(KafkaConsumer consumer) { + return consumer.metrics.reporters(); + } +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/org/apache/kafka/clients/producer/KafkaProducerAccess.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/org/apache/kafka/clients/producer/KafkaProducerAccess.java new file mode 100644 index 0000000000..6121524f6f --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/org/apache/kafka/clients/producer/KafkaProducerAccess.java @@ -0,0 +1,18 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.kafka.clients.producer; + +import java.util.List; +import org.apache.kafka.common.metrics.MetricsReporter; + +public class KafkaProducerAccess { + + private KafkaProducerAccess() {} + + public static List getMetricsReporters(KafkaProducer producer) { + return producer.metrics.reporters(); + } +}