Fix kafka clients latest dep tests (#10687)

This commit is contained in:
Lauri Tulmin 2024-02-27 11:16:19 +02:00 committed by GitHub
parent 26047247b4
commit f62aa9c1f3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 33 additions and 44 deletions

View File

@ -18,6 +18,8 @@ import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.data.PointData;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
@ -33,13 +35,12 @@ import java.util.Set;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.KafkaConsumerAccess;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.KafkaProducerAccess;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
@ -139,14 +140,36 @@ public abstract class AbstractOpenTelemetryMetricsReporterTest {
@Test
void noDuplicateMetricsReporter() {
List<MetricsReporter> producerMetricsReporters =
KafkaProducerAccess.getMetricsReporters(producer);
List<MetricsReporter> producerMetricsReporters = getMetricsReporters(producer);
assertThat(countOpenTelemetryMetricsReporters(producerMetricsReporters)).isEqualTo(1);
List<MetricsReporter> consumerMetricsReporters =
KafkaConsumerAccess.getMetricsReporters(consumer);
List<MetricsReporter> consumerMetricsReporters = getMetricsReporters(consumer);
assertThat(countOpenTelemetryMetricsReporters(consumerMetricsReporters)).isEqualTo(1);
}
private static List<MetricsReporter> getMetricsReporters(Object producerOrConsumer) {
return getMetricsRegistry(producerOrConsumer).reporters();
}
private static Metrics getMetricsRegistry(Object producerOrConsumer) {
Class<?> clazz = producerOrConsumer.getClass();
try {
Field field = clazz.getDeclaredField("metrics");
field.setAccessible(true);
return (Metrics) field.get(producerOrConsumer);
} catch (Exception ignored) {
// Ignore
}
try {
Method method = clazz.getDeclaredMethod("metricsRegistry");
method.setAccessible(true);
return (Metrics) method.invoke(producerOrConsumer);
} catch (Exception ignored) {
// Ignore
}
throw new IllegalStateException(
"Failed to get metrics registry from " + producerOrConsumer.getClass().getName());
}
private static long countOpenTelemetryMetricsReporters(List<MetricsReporter> metricsReporters) {
return metricsReporters.stream()
.filter(reporter -> reporter.getClass().getName().endsWith("OpenTelemetryMetricsReporter"))
@ -176,8 +199,6 @@ public abstract class AbstractOpenTelemetryMetricsReporterTest {
"kafka.consumer.join_total",
"kafka.consumer.last_heartbeat_seconds_ago",
"kafka.consumer.last_rebalance_seconds_ago",
"kafka.consumer.partition_assigned_latency_avg",
"kafka.consumer.partition_assigned_latency_max",
"kafka.consumer.rebalance_latency_avg",
"kafka.consumer.rebalance_latency_max",
"kafka.consumer.rebalance_latency_total",

View File

@ -1,18 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package org.apache.kafka.clients.consumer;
import java.util.List;
import org.apache.kafka.common.metrics.MetricsReporter;
public class KafkaConsumerAccess {
private KafkaConsumerAccess() {}
public static List<MetricsReporter> getMetricsReporters(KafkaConsumer<?, ?> consumer) {
return consumer.metrics.reporters();
}
}

View File

@ -1,18 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package org.apache.kafka.clients.producer;
import java.util.List;
import org.apache.kafka.common.metrics.MetricsReporter;
public class KafkaProducerAccess {
private KafkaProducerAccess() {}
public static List<MetricsReporter> getMetricsReporters(KafkaProducer<?, ?> producer) {
return producer.metrics.reporters();
}
}

View File

@ -19,6 +19,7 @@ dependencies {
tasks {
withType<Test>().configureEach {
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean)
}
val testReceiveSpansDisabled by registering(Test::class) {

View File

@ -24,6 +24,7 @@ import java.io.ObjectStreamClass;
import java.util.Map;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@ -44,6 +45,8 @@ class OpenTelemetryMetricsReporterTest extends AbstractOpenTelemetryMetricsRepor
@Test
void badConfig() {
Assumptions.assumeFalse(Boolean.getBoolean("testLatestDeps"));
// Bad producer config
assertThatThrownBy(
() -> {