Add kafka client metrics to the javaagent instrumentation (#6533)

* Add kafka client metrics to the javaagent instrumentation

* Don't override user-set metrics reporter

* Add kafka metric reporter config property
This commit is contained in:
Mateusz Rzeszutek 2022-09-13 05:36:14 +02:00 committed by GitHub
parent d779a6399a
commit c96a49e253
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 648 additions and 435 deletions

View File

@ -28,6 +28,8 @@ tasks {
withType<Test>().configureEach { withType<Test>().configureEach {
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service) usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean)
// TODO run tests both with and without experimental span attributes // TODO run tests both with and without experimental span attributes
jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true") jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true")
} }

View File

@ -7,6 +7,8 @@ package io.opentelemetry.javaagent.instrumentation.kafkaclients;
import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext; import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext;
import static io.opentelemetry.javaagent.instrumentation.kafkaclients.KafkaSingletons.consumerReceiveInstrumenter; import static io.opentelemetry.javaagent.instrumentation.kafkaclients.KafkaSingletons.consumerReceiveInstrumenter;
import static io.opentelemetry.javaagent.instrumentation.kafkaclients.KafkaSingletons.enhanceConfig;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isPublic; import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.returns; import static net.bytebuddy.matcher.ElementMatchers.returns;
@ -21,6 +23,8 @@ import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTra
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.time.Duration; import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import net.bytebuddy.asm.Advice; import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher; import net.bytebuddy.matcher.ElementMatcher;
@ -36,6 +40,12 @@ public class KafkaConsumerInstrumentation implements TypeInstrumentation {
@Override @Override
public void transform(TypeTransformer transformer) { public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isConstructor().and(takesArgument(0, Map.class)),
this.getClass().getName() + "$ConstructorMapAdvice");
transformer.applyAdviceToMethod(
isConstructor().and(takesArgument(0, Properties.class)),
this.getClass().getName() + "$ConstructorPropertiesAdvice");
transformer.applyAdviceToMethod( transformer.applyAdviceToMethod(
named("poll") named("poll")
.and(isPublic()) .and(isPublic())
@ -45,6 +55,24 @@ public class KafkaConsumerInstrumentation implements TypeInstrumentation {
this.getClass().getName() + "$PollAdvice"); this.getClass().getName() + "$PollAdvice");
} }
@SuppressWarnings("unused")
public static class ConstructorMapAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(@Advice.Argument(0) Map<String, Object> config) {
enhanceConfig(config);
}
}
@SuppressWarnings("unused")
public static class ConstructorPropertiesAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(@Advice.Argument(0) Properties config) {
enhanceConfig(config);
}
}
@SuppressWarnings("unused") @SuppressWarnings("unused")
public static class PollAdvice { public static class PollAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class) @Advice.OnMethodEnter(suppress = Throwable.class)

View File

@ -5,7 +5,9 @@
package io.opentelemetry.javaagent.instrumentation.kafkaclients; package io.opentelemetry.javaagent.instrumentation.kafkaclients;
import static io.opentelemetry.javaagent.instrumentation.kafkaclients.KafkaSingletons.enhanceConfig;
import static io.opentelemetry.javaagent.instrumentation.kafkaclients.KafkaSingletons.producerInstrumenter; import static io.opentelemetry.javaagent.instrumentation.kafkaclients.KafkaSingletons.producerInstrumenter;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic; import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.named;
@ -17,6 +19,8 @@ import io.opentelemetry.instrumentation.kafka.internal.KafkaPropagation;
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge; import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.Map;
import java.util.Properties;
import net.bytebuddy.asm.Advice; import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher; import net.bytebuddy.matcher.ElementMatcher;
@ -33,17 +37,41 @@ public class KafkaProducerInstrumentation implements TypeInstrumentation {
@Override @Override
public void transform(TypeTransformer transformer) { public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isConstructor().and(takesArgument(0, Map.class)),
this.getClass().getName() + "$ConstructorMapAdvice");
transformer.applyAdviceToMethod(
isConstructor().and(takesArgument(0, Properties.class)),
this.getClass().getName() + "$ConstructorPropertiesAdvice");
transformer.applyAdviceToMethod( transformer.applyAdviceToMethod(
isMethod() isMethod()
.and(isPublic()) .and(isPublic())
.and(named("send")) .and(named("send"))
.and(takesArgument(0, named("org.apache.kafka.clients.producer.ProducerRecord"))) .and(takesArgument(0, named("org.apache.kafka.clients.producer.ProducerRecord")))
.and(takesArgument(1, named("org.apache.kafka.clients.producer.Callback"))), .and(takesArgument(1, named("org.apache.kafka.clients.producer.Callback"))),
KafkaProducerInstrumentation.class.getName() + "$ProducerAdvice"); KafkaProducerInstrumentation.class.getName() + "$SendAdvice");
} }
@SuppressWarnings("unused") @SuppressWarnings("unused")
public static class ProducerAdvice { public static class ConstructorMapAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(@Advice.Argument(0) Map<String, Object> config) {
enhanceConfig(config);
}
}
@SuppressWarnings("unused")
public static class ConstructorPropertiesAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(@Advice.Argument(0) Properties config) {
enhanceConfig(config);
}
}
@SuppressWarnings("unused")
public static class SendAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class) @Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter( public static void onEnter(

View File

@ -8,8 +8,11 @@ package io.opentelemetry.javaagent.instrumentation.kafkaclients;
import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.kafka.internal.KafkaInstrumenterFactory; import io.opentelemetry.instrumentation.kafka.internal.KafkaInstrumenterFactory;
import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetryMetricsReporter;
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig; import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig; import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig;
import java.util.Map;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
@ -20,6 +23,9 @@ public final class KafkaSingletons {
private static final boolean PROPAGATION_ENABLED = private static final boolean PROPAGATION_ENABLED =
InstrumentationConfig.get() InstrumentationConfig.get()
.getBoolean("otel.instrumentation.kafka.client-propagation.enabled", true); .getBoolean("otel.instrumentation.kafka.client-propagation.enabled", true);
private static final boolean METRICS_ENABLED =
InstrumentationConfig.get()
.getBoolean("otel.instrumentation.kafka.metric-reporter.enabled", true);
private static final Instrumenter<ProducerRecord<?, ?>, Void> PRODUCER_INSTRUMENTER; private static final Instrumenter<ProducerRecord<?, ?>, Void> PRODUCER_INSTRUMENTER;
private static final Instrumenter<ConsumerRecords<?, ?>, Void> CONSUMER_RECEIVE_INSTRUMENTER; private static final Instrumenter<ConsumerRecords<?, ?>, Void> CONSUMER_RECEIVE_INSTRUMENTER;
@ -56,5 +62,20 @@ public final class KafkaSingletons {
return CONSUMER_PROCESS_INSTRUMENTER; return CONSUMER_PROCESS_INSTRUMENTER;
} }
public static void enhanceConfig(Map<? super String, Object> config) {
if (!METRICS_ENABLED) {
return;
}
config.merge(
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
OpenTelemetryMetricsReporter.class.getName(),
(class1, class2) -> class1 + "," + class2);
config.put(
OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTANCE, GlobalOpenTelemetry.get());
config.put(
OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTRUMENTATION_NAME,
INSTRUMENTATION_NAME);
}
private KafkaSingletons() {} private KafkaSingletons() {}
} }

View File

@ -0,0 +1,36 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.kafkaclients;
import static java.util.Collections.emptyMap;
import io.opentelemetry.instrumentation.kafka.internal.AbstractOpenTelemetryMetricsReporterTest;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import java.util.Map;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
import org.junit.jupiter.api.extension.RegisterExtension;
@EnabledIfSystemProperty(
named = "testLatestDeps",
matches = "true",
disabledReason =
"kafka-clients 0.11 emits a significantly different set of metrics; it's probably fine to just test the latest version")
class OpenTelemetryMetricsReporterTest extends AbstractOpenTelemetryMetricsReporterTest {
@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
@Override
protected InstrumentationExtension testing() {
return testing;
}
@Override
protected Map<String, ?> additionalConfig() {
return emptyMap();
}
}

View File

@ -10,4 +10,8 @@ dependencies {
implementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-common:library")) implementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-common:library"))
implementation("org.testcontainers:kafka") implementation("org.testcontainers:kafka")
implementation("org.testcontainers:junit-jupiter")
compileOnly("com.google.auto.value:auto-value-annotations")
annotationProcessor("com.google.auto.value:auto-value")
} }

View File

@ -0,0 +1,449 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.kafka.internal;
import static java.lang.System.lineSeparator;
import static java.util.Comparator.comparing;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static org.assertj.core.api.Assertions.assertThat;
import com.google.auto.value.AutoValue;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.data.PointData;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
@SuppressWarnings("OtelInternalJavadoc")
@Testcontainers
public abstract class AbstractOpenTelemetryMetricsReporterTest {
private static final Logger logger =
LoggerFactory.getLogger(AbstractOpenTelemetryMetricsReporterTest.class);
private static final List<String> TOPICS = Arrays.asList("foo", "bar", "baz", "qux");
private static final Random RANDOM = new Random();
private static KafkaContainer kafka;
private static KafkaProducer<byte[], byte[]> producer;
private static KafkaConsumer<byte[], byte[]> consumer;
@BeforeEach
void beforeAll() {
// only start the kafka container the first time this runs
if (kafka != null) {
return;
}
kafka =
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"))
.withLogConsumer(new Slf4jLogConsumer(logger))
.waitingFor(Wait.forLogMessage(".*started \\(kafka.server.KafkaServer\\).*", 1))
.withStartupTimeout(Duration.ofMinutes(1));
kafka.start();
producer = new KafkaProducer<>(producerConfig());
consumer = new KafkaConsumer<>(consumerConfig());
}
@AfterAll
static void afterAll() {
kafka.stop();
producer.close();
consumer.close();
}
@AfterEach
void tearDown() {
OpenTelemetryMetricsReporter.resetForTest();
}
protected abstract InstrumentationExtension testing();
protected abstract Map<String, ?> additionalConfig();
protected Map<String, Object> producerConfig() {
Map<String, Object> producerConfig = new HashMap<>();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-client-id");
producerConfig.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerConfig.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerConfig.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
producerConfig.putAll(additionalConfig());
producerConfig.merge(
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
TestMetricsReporter.class.getName(),
(o, o2) -> o + "," + o2);
return producerConfig;
}
protected Map<String, Object> consumerConfig() {
Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "sample-group");
consumerConfig.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
consumerConfig.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
consumerConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2);
consumerConfig.putAll(additionalConfig());
consumerConfig.merge(
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
TestMetricsReporter.class.getName(),
(o, o2) -> o + "," + o2);
return consumerConfig;
}
@Test
void observeMetrics() {
produceRecords();
consumeRecords();
Set<String> expectedMetricNames =
new HashSet<>(
Arrays.asList(
"kafka.consumer.commit_latency_avg",
"kafka.consumer.commit_latency_max",
"kafka.consumer.commit_rate",
"kafka.consumer.commit_total",
"kafka.consumer.failed_rebalance_rate_per_hour",
"kafka.consumer.failed_rebalance_total",
"kafka.consumer.heartbeat_rate",
"kafka.consumer.heartbeat_response_time_max",
"kafka.consumer.heartbeat_total",
"kafka.consumer.join_rate",
"kafka.consumer.join_time_avg",
"kafka.consumer.join_time_max",
"kafka.consumer.join_total",
"kafka.consumer.last_heartbeat_seconds_ago",
"kafka.consumer.last_rebalance_seconds_ago",
"kafka.consumer.partition_assigned_latency_avg",
"kafka.consumer.partition_assigned_latency_max",
"kafka.consumer.partition_lost_latency_avg",
"kafka.consumer.partition_lost_latency_max",
"kafka.consumer.partition_revoked_latency_avg",
"kafka.consumer.partition_revoked_latency_max",
"kafka.consumer.rebalance_latency_avg",
"kafka.consumer.rebalance_latency_max",
"kafka.consumer.rebalance_latency_total",
"kafka.consumer.rebalance_rate_per_hour",
"kafka.consumer.rebalance_total",
"kafka.consumer.sync_rate",
"kafka.consumer.sync_time_avg",
"kafka.consumer.sync_time_max",
"kafka.consumer.sync_total",
"kafka.consumer.bytes_consumed_rate",
"kafka.consumer.bytes_consumed_total",
"kafka.consumer.fetch_latency_avg",
"kafka.consumer.fetch_latency_max",
"kafka.consumer.fetch_rate",
"kafka.consumer.fetch_size_avg",
"kafka.consumer.fetch_size_max",
"kafka.consumer.fetch_throttle_time_avg",
"kafka.consumer.fetch_throttle_time_max",
"kafka.consumer.fetch_total",
"kafka.consumer.records_consumed_rate",
"kafka.consumer.records_consumed_total",
"kafka.consumer.records_lag",
"kafka.consumer.records_lag_avg",
"kafka.consumer.records_lag_max",
"kafka.consumer.records_lead",
"kafka.consumer.records_lead_avg",
"kafka.consumer.records_lead_min",
"kafka.consumer.records_per_request_avg",
"kafka.consumer.connection_close_rate",
"kafka.consumer.connection_close_total",
"kafka.consumer.connection_count",
"kafka.consumer.connection_creation_rate",
"kafka.consumer.connection_creation_total",
"kafka.consumer.failed_authentication_rate",
"kafka.consumer.failed_authentication_total",
"kafka.consumer.failed_reauthentication_rate",
"kafka.consumer.failed_reauthentication_total",
"kafka.consumer.incoming_byte_rate",
"kafka.consumer.incoming_byte_total",
"kafka.consumer.io_ratio",
"kafka.consumer.io_time_ns_avg",
"kafka.consumer.io_wait_ratio",
"kafka.consumer.io_wait_time_ns_avg",
"kafka.consumer.io_waittime_total",
"kafka.consumer.iotime_total",
"kafka.consumer.last_poll_seconds_ago",
"kafka.consumer.network_io_rate",
"kafka.consumer.network_io_total",
"kafka.consumer.outgoing_byte_rate",
"kafka.consumer.outgoing_byte_total",
"kafka.consumer.poll_idle_ratio_avg",
"kafka.consumer.reauthentication_latency_avg",
"kafka.consumer.reauthentication_latency_max",
"kafka.consumer.request_rate",
"kafka.consumer.request_size_avg",
"kafka.consumer.request_size_max",
"kafka.consumer.request_total",
"kafka.consumer.response_rate",
"kafka.consumer.response_total",
"kafka.consumer.select_rate",
"kafka.consumer.select_total",
"kafka.consumer.successful_authentication_no_reauth_total",
"kafka.consumer.successful_authentication_rate",
"kafka.consumer.successful_authentication_total",
"kafka.consumer.successful_reauthentication_rate",
"kafka.consumer.successful_reauthentication_total",
"kafka.consumer.time_between_poll_avg",
"kafka.consumer.time_between_poll_max",
"kafka.consumer.request_latency_avg",
"kafka.consumer.request_latency_max",
"kafka.producer.batch_size_avg",
"kafka.producer.batch_size_max",
"kafka.producer.batch_split_rate",
"kafka.producer.batch_split_total",
"kafka.producer.buffer_available_bytes",
"kafka.producer.buffer_exhausted_rate",
"kafka.producer.buffer_exhausted_total",
"kafka.producer.buffer_total_bytes",
"kafka.producer.bufferpool_wait_ratio",
"kafka.producer.bufferpool_wait_time_total",
"kafka.producer.compression_rate_avg",
"kafka.producer.connection_close_rate",
"kafka.producer.connection_close_total",
"kafka.producer.connection_count",
"kafka.producer.connection_creation_rate",
"kafka.producer.connection_creation_total",
"kafka.producer.failed_authentication_rate",
"kafka.producer.failed_authentication_total",
"kafka.producer.failed_reauthentication_rate",
"kafka.producer.failed_reauthentication_total",
"kafka.producer.incoming_byte_rate",
"kafka.producer.incoming_byte_total",
"kafka.producer.io_ratio",
"kafka.producer.io_time_ns_avg",
"kafka.producer.io_wait_ratio",
"kafka.producer.io_wait_time_ns_avg",
"kafka.producer.io_waittime_total",
"kafka.producer.iotime_total",
"kafka.producer.metadata_age",
"kafka.producer.network_io_rate",
"kafka.producer.network_io_total",
"kafka.producer.outgoing_byte_rate",
"kafka.producer.outgoing_byte_total",
"kafka.producer.produce_throttle_time_avg",
"kafka.producer.produce_throttle_time_max",
"kafka.producer.reauthentication_latency_avg",
"kafka.producer.reauthentication_latency_max",
"kafka.producer.record_error_rate",
"kafka.producer.record_error_total",
"kafka.producer.record_queue_time_avg",
"kafka.producer.record_queue_time_max",
"kafka.producer.record_retry_rate",
"kafka.producer.record_retry_total",
"kafka.producer.record_send_rate",
"kafka.producer.record_send_total",
"kafka.producer.record_size_avg",
"kafka.producer.record_size_max",
"kafka.producer.records_per_request_avg",
"kafka.producer.request_latency_avg",
"kafka.producer.request_latency_max",
"kafka.producer.request_rate",
"kafka.producer.request_size_avg",
"kafka.producer.request_size_max",
"kafka.producer.request_total",
"kafka.producer.requests_in_flight",
"kafka.producer.response_rate",
"kafka.producer.response_total",
"kafka.producer.select_rate",
"kafka.producer.select_total",
"kafka.producer.successful_authentication_no_reauth_total",
"kafka.producer.successful_authentication_rate",
"kafka.producer.successful_authentication_total",
"kafka.producer.successful_reauthentication_rate",
"kafka.producer.successful_reauthentication_total",
"kafka.producer.waiting_threads",
"kafka.producer.byte_rate",
"kafka.producer.byte_total",
"kafka.producer.compression_rate"));
List<MetricData> metrics = testing().metrics();
Set<String> metricNames = metrics.stream().map(MetricData::getName).collect(toSet());
assertThat(metricNames).containsAll(expectedMetricNames);
assertThat(metrics)
.allSatisfy(
metricData -> {
Set<String> expectedKeys =
metricData.getData().getPoints().stream()
.findFirst()
.map(
point ->
point.getAttributes().asMap().keySet().stream()
.map(AttributeKey::getKey)
.collect(toSet()))
.orElse(Collections.emptySet());
assertThat(metricData.getData().getPoints())
.extracting(PointData::getAttributes)
.extracting(
attributes ->
attributes.asMap().keySet().stream()
.map(AttributeKey::getKey)
.collect(toSet()))
.allSatisfy(attributeKeys -> assertThat(attributeKeys).isEqualTo(expectedKeys));
});
// Print mapping table
printMappingTable();
}
private static void produceRecords() {
for (int i = 0; i < 100; i++) {
producer.send(
new ProducerRecord<>(
TOPICS.get(RANDOM.nextInt(TOPICS.size())),
0,
System.currentTimeMillis(),
"key".getBytes(StandardCharsets.UTF_8),
"value".getBytes(StandardCharsets.UTF_8)));
}
}
private static void consumeRecords() {
consumer.subscribe(TOPICS);
Instant stopTime = Instant.now().plusSeconds(10);
while (Instant.now().isBefore(stopTime)) {
consumer.poll(1_000);
}
}
/**
* Print a table mapping kafka metrics to equivalent OpenTelemetry metrics, in markdown format.
*/
private static void printMappingTable() {
StringBuilder sb = new StringBuilder();
// Append table headers
sb.append(
"| Metric Group | Metric Name | Attribute Keys | Instrument Name | Instrument Description | Instrument Type |")
.append(lineSeparator())
.append(
"|--------------|-------------|----------------|-----------------|------------------------|-----------------|")
.append(lineSeparator());
Map<String, List<KafkaMetricId>> kafkaMetricsByGroup =
TestMetricsReporter.seenMetrics.stream().collect(groupingBy(KafkaMetricId::getGroup));
List<RegisteredObservable> registeredObservables =
OpenTelemetryMetricsReporter.getRegisteredObservables();
// Iterate through groups in alpha order
for (String group : kafkaMetricsByGroup.keySet().stream().sorted().collect(toList())) {
List<KafkaMetricId> kafkaMetricIds =
kafkaMetricsByGroup.get(group).stream()
.sorted(
comparing(KafkaMetricId::getName)
.thenComparing(kafkaMetricId -> kafkaMetricId.getAttributeKeys().size()))
.collect(toList());
// Iterate through metrics in alpha order by name
for (KafkaMetricId kafkaMetricId : kafkaMetricIds) {
// Find first (there may be multiple) registered instrument that matches the kafkaMetricId
Optional<InstrumentDescriptor> descriptor =
registeredObservables.stream()
.filter(
registeredObservable ->
KafkaMetricId.create(registeredObservable.getKafkaMetricName())
.equals(kafkaMetricId))
.findFirst()
.map(RegisteredObservable::getInstrumentDescriptor);
// Append table row
sb.append(
String.format(
"| %s | %s | %s | %s | %s | %s |%n",
"`" + group + "`",
"`" + kafkaMetricId.getName() + "`",
kafkaMetricId.getAttributeKeys().stream()
.map(key -> "`" + key + "`")
.collect(joining(",")),
descriptor.map(i -> "`" + i.getName() + "`").orElse(""),
descriptor.map(InstrumentDescriptor::getDescription).orElse(""),
descriptor.map(i -> "`" + i.getInstrumentType() + "`").orElse("")));
}
}
logger.info("Mapping table" + System.lineSeparator() + sb);
}
/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
* any time.
*/
public static class TestMetricsReporter implements MetricsReporter {
private static final Set<KafkaMetricId> seenMetrics = new HashSet<>();
@Override
public void init(List<KafkaMetric> list) {
list.forEach(this::metricChange);
}
@Override
public void metricChange(KafkaMetric kafkaMetric) {
seenMetrics.add(KafkaMetricId.create(kafkaMetric.metricName()));
}
@Override
public void metricRemoval(KafkaMetric kafkaMetric) {}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> map) {}
}
@AutoValue
abstract static class KafkaMetricId {
abstract String getGroup();
abstract String getName();
abstract Set<String> getAttributeKeys();
static KafkaMetricId create(MetricName metricName) {
return new AutoValue_AbstractOpenTelemetryMetricsReporterTest_KafkaMetricId(
metricName.group(), metricName.name(), metricName.tags().keySet());
}
}
}

View File

@ -150,6 +150,9 @@ public final class KafkaTelemetry {
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
OpenTelemetryMetricsReporter.class.getName()); OpenTelemetryMetricsReporter.class.getName());
config.put(OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTANCE, openTelemetry); config.put(OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTANCE, openTelemetry);
config.put(
OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTRUMENTATION_NAME,
KafkaTelemetryBuilder.INSTRUMENTATION_NAME);
return Collections.unmodifiableMap(config); return Collections.unmodifiableMap(config);
} }

View File

@ -18,7 +18,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
public final class KafkaTelemetryBuilder { public final class KafkaTelemetryBuilder {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.kafka-clients-2.6"; static final String INSTRUMENTATION_NAME = "io.opentelemetry.kafka-clients-2.6";
private final OpenTelemetry openTelemetry; private final OpenTelemetry openTelemetry;
private final List<AttributesExtractor<ProducerRecord<?, ?>, Void>> producerAttributesExtractors = private final List<AttributesExtractor<ProducerRecord<?, ?>, Void>> producerAttributesExtractors =

View File

@ -5,132 +5,30 @@
package io.opentelemetry.instrumentation.kafka.internal; package io.opentelemetry.instrumentation.kafka.internal;
import static java.lang.System.lineSeparator;
import static java.util.Comparator.comparing;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.assertThatThrownBy;
import com.google.auto.value.AutoValue;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.instrumentation.kafkaclients.KafkaTelemetry; import io.opentelemetry.instrumentation.kafkaclients.KafkaTelemetry;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.data.PointData;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
@Testcontainers class OpenTelemetryMetricsReporterTest extends AbstractOpenTelemetryMetricsReporterTest {
class OpenTelemetryMetricsReporterTest {
private static final Logger logger =
LoggerFactory.getLogger(OpenTelemetryMetricsReporterTest.class);
private static final List<String> TOPICS = Arrays.asList("foo", "bar", "baz", "qux");
private static final Random RANDOM = new Random();
@RegisterExtension @RegisterExtension
static final InstrumentationExtension testing = LibraryInstrumentationExtension.create(); static final InstrumentationExtension testing = LibraryInstrumentationExtension.create();
private static KafkaContainer kafka; @Override
private static KafkaProducer<byte[], byte[]> producer; protected InstrumentationExtension testing() {
private static KafkaConsumer<byte[], byte[]> consumer; return testing;
@BeforeAll
static void beforeAll() {
kafka =
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"))
.withLogConsumer(new Slf4jLogConsumer(logger))
.waitingFor(Wait.forLogMessage(".*started \\(kafka.server.KafkaServer\\).*", 1))
.withStartupTimeout(Duration.ofMinutes(1));
kafka.start();
producer = new KafkaProducer<>(producerConfig());
consumer = new KafkaConsumer<>(consumerConfig());
} }
@AfterAll @Override
static void afterAll() { protected Map<String, ?> additionalConfig() {
kafka.stop(); return KafkaTelemetry.create(testing.getOpenTelemetry()).metricConfigProperties();
producer.close();
consumer.close();
}
@AfterEach
void tearDown() {
OpenTelemetryMetricsReporter.resetForTest();
}
private static Map<String, Object> producerConfig() {
Map<String, Object> producerConfig = new HashMap<>();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-client-id");
producerConfig.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerConfig.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerConfig.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
producerConfig.putAll(
KafkaTelemetry.create(testing.getOpenTelemetry()).metricConfigProperties());
producerConfig.merge(
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
TestMetricsReporter.class.getName(),
(o, o2) -> o + "," + o2);
return producerConfig;
}
private static Map<String, Object> consumerConfig() {
Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "sample-group");
consumerConfig.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
consumerConfig.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
consumerConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2);
consumerConfig.putAll(
KafkaTelemetry.create(testing.getOpenTelemetry()).metricConfigProperties());
consumerConfig.merge(
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
TestMetricsReporter.class.getName(),
(o, o2) -> o + "," + o2);
return consumerConfig;
} }
@Test @Test
@ -154,6 +52,26 @@ class OpenTelemetryMetricsReporterTest {
.hasRootCauseInstanceOf(IllegalStateException.class) .hasRootCauseInstanceOf(IllegalStateException.class)
.hasRootCauseMessage( .hasRootCauseMessage(
"Configuration property opentelemetry.instance is not instance of OpenTelemetry"); "Configuration property opentelemetry.instance is not instance of OpenTelemetry");
assertThatThrownBy(
() -> {
Map<String, Object> producerConfig = producerConfig();
producerConfig.remove(
OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTRUMENTATION_NAME);
new KafkaProducer<>(producerConfig).close();
})
.hasRootCauseInstanceOf(IllegalStateException.class)
.hasRootCauseMessage(
"Missing required configuration property: opentelemetry.instrumentation_name");
assertThatThrownBy(
() -> {
Map<String, Object> producerConfig = producerConfig();
producerConfig.put(
OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTRUMENTATION_NAME, 42);
new KafkaProducer<>(producerConfig).close();
})
.hasRootCauseInstanceOf(IllegalStateException.class)
.hasRootCauseMessage(
"Configuration property opentelemetry.instrumentation_name is not instance of String");
// Bad consumer config // Bad consumer config
assertThatThrownBy( assertThatThrownBy(
@ -174,318 +92,25 @@ class OpenTelemetryMetricsReporterTest {
.hasRootCauseInstanceOf(IllegalStateException.class) .hasRootCauseInstanceOf(IllegalStateException.class)
.hasRootCauseMessage( .hasRootCauseMessage(
"Configuration property opentelemetry.instance is not instance of OpenTelemetry"); "Configuration property opentelemetry.instance is not instance of OpenTelemetry");
} assertThatThrownBy(
() -> {
@Test Map<String, Object> consumerConfig = consumerConfig();
void observeMetrics() { consumerConfig.remove(
produceRecords(); OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTRUMENTATION_NAME);
consumeRecords(); new KafkaConsumer<>(consumerConfig).close();
})
Set<String> expectedMetricNames = .hasRootCauseInstanceOf(IllegalStateException.class)
new HashSet<>( .hasRootCauseMessage(
Arrays.asList( "Missing required configuration property: opentelemetry.instrumentation_name");
"kafka.consumer.commit_latency_avg", assertThatThrownBy(
"kafka.consumer.commit_latency_max", () -> {
"kafka.consumer.commit_rate", Map<String, Object> consumerConfig = consumerConfig();
"kafka.consumer.commit_total", consumerConfig.put(
"kafka.consumer.failed_rebalance_rate_per_hour", OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTRUMENTATION_NAME, 42);
"kafka.consumer.failed_rebalance_total", new KafkaConsumer<>(consumerConfig).close();
"kafka.consumer.heartbeat_rate", })
"kafka.consumer.heartbeat_response_time_max", .hasRootCauseInstanceOf(IllegalStateException.class)
"kafka.consumer.heartbeat_total", .hasRootCauseMessage(
"kafka.consumer.join_rate", "Configuration property opentelemetry.instrumentation_name is not instance of String");
"kafka.consumer.join_time_avg",
"kafka.consumer.join_time_max",
"kafka.consumer.join_total",
"kafka.consumer.last_heartbeat_seconds_ago",
"kafka.consumer.last_rebalance_seconds_ago",
"kafka.consumer.partition_assigned_latency_avg",
"kafka.consumer.partition_assigned_latency_max",
"kafka.consumer.partition_lost_latency_avg",
"kafka.consumer.partition_lost_latency_max",
"kafka.consumer.partition_revoked_latency_avg",
"kafka.consumer.partition_revoked_latency_max",
"kafka.consumer.rebalance_latency_avg",
"kafka.consumer.rebalance_latency_max",
"kafka.consumer.rebalance_latency_total",
"kafka.consumer.rebalance_rate_per_hour",
"kafka.consumer.rebalance_total",
"kafka.consumer.sync_rate",
"kafka.consumer.sync_time_avg",
"kafka.consumer.sync_time_max",
"kafka.consumer.sync_total",
"kafka.consumer.bytes_consumed_rate",
"kafka.consumer.bytes_consumed_total",
"kafka.consumer.fetch_latency_avg",
"kafka.consumer.fetch_latency_max",
"kafka.consumer.fetch_rate",
"kafka.consumer.fetch_size_avg",
"kafka.consumer.fetch_size_max",
"kafka.consumer.fetch_throttle_time_avg",
"kafka.consumer.fetch_throttle_time_max",
"kafka.consumer.fetch_total",
"kafka.consumer.records_consumed_rate",
"kafka.consumer.records_consumed_total",
"kafka.consumer.records_lag",
"kafka.consumer.records_lag_avg",
"kafka.consumer.records_lag_max",
"kafka.consumer.records_lead",
"kafka.consumer.records_lead_avg",
"kafka.consumer.records_lead_min",
"kafka.consumer.records_per_request_avg",
"kafka.consumer.connection_close_rate",
"kafka.consumer.connection_close_total",
"kafka.consumer.connection_count",
"kafka.consumer.connection_creation_rate",
"kafka.consumer.connection_creation_total",
"kafka.consumer.failed_authentication_rate",
"kafka.consumer.failed_authentication_total",
"kafka.consumer.failed_reauthentication_rate",
"kafka.consumer.failed_reauthentication_total",
"kafka.consumer.incoming_byte_rate",
"kafka.consumer.incoming_byte_total",
"kafka.consumer.io_ratio",
"kafka.consumer.io_time_ns_avg",
"kafka.consumer.io_wait_ratio",
"kafka.consumer.io_wait_time_ns_avg",
"kafka.consumer.io_waittime_total",
"kafka.consumer.iotime_total",
"kafka.consumer.last_poll_seconds_ago",
"kafka.consumer.network_io_rate",
"kafka.consumer.network_io_total",
"kafka.consumer.outgoing_byte_rate",
"kafka.consumer.outgoing_byte_total",
"kafka.consumer.poll_idle_ratio_avg",
"kafka.consumer.reauthentication_latency_avg",
"kafka.consumer.reauthentication_latency_max",
"kafka.consumer.request_rate",
"kafka.consumer.request_size_avg",
"kafka.consumer.request_size_max",
"kafka.consumer.request_total",
"kafka.consumer.response_rate",
"kafka.consumer.response_total",
"kafka.consumer.select_rate",
"kafka.consumer.select_total",
"kafka.consumer.successful_authentication_no_reauth_total",
"kafka.consumer.successful_authentication_rate",
"kafka.consumer.successful_authentication_total",
"kafka.consumer.successful_reauthentication_rate",
"kafka.consumer.successful_reauthentication_total",
"kafka.consumer.time_between_poll_avg",
"kafka.consumer.time_between_poll_max",
"kafka.consumer.request_latency_avg",
"kafka.consumer.request_latency_max",
"kafka.producer.batch_size_avg",
"kafka.producer.batch_size_max",
"kafka.producer.batch_split_rate",
"kafka.producer.batch_split_total",
"kafka.producer.buffer_available_bytes",
"kafka.producer.buffer_exhausted_rate",
"kafka.producer.buffer_exhausted_total",
"kafka.producer.buffer_total_bytes",
"kafka.producer.bufferpool_wait_ratio",
"kafka.producer.bufferpool_wait_time_total",
"kafka.producer.compression_rate_avg",
"kafka.producer.connection_close_rate",
"kafka.producer.connection_close_total",
"kafka.producer.connection_count",
"kafka.producer.connection_creation_rate",
"kafka.producer.connection_creation_total",
"kafka.producer.failed_authentication_rate",
"kafka.producer.failed_authentication_total",
"kafka.producer.failed_reauthentication_rate",
"kafka.producer.failed_reauthentication_total",
"kafka.producer.incoming_byte_rate",
"kafka.producer.incoming_byte_total",
"kafka.producer.io_ratio",
"kafka.producer.io_time_ns_avg",
"kafka.producer.io_wait_ratio",
"kafka.producer.io_wait_time_ns_avg",
"kafka.producer.io_waittime_total",
"kafka.producer.iotime_total",
"kafka.producer.metadata_age",
"kafka.producer.network_io_rate",
"kafka.producer.network_io_total",
"kafka.producer.outgoing_byte_rate",
"kafka.producer.outgoing_byte_total",
"kafka.producer.produce_throttle_time_avg",
"kafka.producer.produce_throttle_time_max",
"kafka.producer.reauthentication_latency_avg",
"kafka.producer.reauthentication_latency_max",
"kafka.producer.record_error_rate",
"kafka.producer.record_error_total",
"kafka.producer.record_queue_time_avg",
"kafka.producer.record_queue_time_max",
"kafka.producer.record_retry_rate",
"kafka.producer.record_retry_total",
"kafka.producer.record_send_rate",
"kafka.producer.record_send_total",
"kafka.producer.record_size_avg",
"kafka.producer.record_size_max",
"kafka.producer.records_per_request_avg",
"kafka.producer.request_latency_avg",
"kafka.producer.request_latency_max",
"kafka.producer.request_rate",
"kafka.producer.request_size_avg",
"kafka.producer.request_size_max",
"kafka.producer.request_total",
"kafka.producer.requests_in_flight",
"kafka.producer.response_rate",
"kafka.producer.response_total",
"kafka.producer.select_rate",
"kafka.producer.select_total",
"kafka.producer.successful_authentication_no_reauth_total",
"kafka.producer.successful_authentication_rate",
"kafka.producer.successful_authentication_total",
"kafka.producer.successful_reauthentication_rate",
"kafka.producer.successful_reauthentication_total",
"kafka.producer.waiting_threads",
"kafka.producer.byte_rate",
"kafka.producer.byte_total",
"kafka.producer.compression_rate"));
List<MetricData> metrics = testing.metrics();
Set<String> metricNames = metrics.stream().map(MetricData::getName).collect(toSet());
assertThat(metricNames).containsAll(expectedMetricNames);
assertThat(metrics)
.allSatisfy(
metricData -> {
Set<String> expectedKeys =
metricData.getData().getPoints().stream()
.findFirst()
.map(
point ->
point.getAttributes().asMap().keySet().stream()
.map(AttributeKey::getKey)
.collect(toSet()))
.orElse(Collections.emptySet());
assertThat(metricData.getData().getPoints())
.extracting(PointData::getAttributes)
.extracting(
attributes ->
attributes.asMap().keySet().stream()
.map(AttributeKey::getKey)
.collect(toSet()))
.allSatisfy(attributeKeys -> assertThat(attributeKeys).isEqualTo(expectedKeys));
});
// Print mapping table
printMappingTable();
}
private static void produceRecords() {
for (int i = 0; i < 100; i++) {
producer.send(
new ProducerRecord<>(
TOPICS.get(RANDOM.nextInt(TOPICS.size())),
0,
System.currentTimeMillis(),
"key".getBytes(StandardCharsets.UTF_8),
"value".getBytes(StandardCharsets.UTF_8)));
}
}
private static void consumeRecords() {
consumer.subscribe(TOPICS);
Instant stopTime = Instant.now().plusSeconds(10);
while (Instant.now().isBefore(stopTime)) {
consumer.poll(Duration.ofSeconds(1));
}
}
/**
* Print a table mapping kafka metrics to equivalent OpenTelemetry metrics, in markdown format.
*/
private static void printMappingTable() {
StringBuilder sb = new StringBuilder();
// Append table headers
sb.append(
"| Metric Group | Metric Name | Attribute Keys | Instrument Name | Instrument Description | Instrument Type |")
.append(lineSeparator())
.append(
"|--------------|-------------|----------------|-----------------|------------------------|-----------------|")
.append(lineSeparator());
Map<String, List<KafkaMetricId>> kafkaMetricsByGroup =
TestMetricsReporter.seenMetrics.stream().collect(groupingBy(KafkaMetricId::getGroup));
List<RegisteredObservable> registeredObservables =
OpenTelemetryMetricsReporter.getRegisteredObservables();
// Iterate through groups in alpha order
for (String group : kafkaMetricsByGroup.keySet().stream().sorted().collect(toList())) {
List<KafkaMetricId> kafkaMetricIds =
kafkaMetricsByGroup.get(group).stream()
.sorted(
comparing(KafkaMetricId::getName)
.thenComparing(kafkaMetricId -> kafkaMetricId.getAttributeKeys().size()))
.collect(toList());
// Iterate through metrics in alpha order by name
for (KafkaMetricId kafkaMetricId : kafkaMetricIds) {
// Find first (there may be multiple) registered instrument that matches the kafkaMetricId
Optional<InstrumentDescriptor> descriptor =
registeredObservables.stream()
.filter(
registeredObservable ->
KafkaMetricId.create(registeredObservable.getKafkaMetricName())
.equals(kafkaMetricId))
.findFirst()
.map(RegisteredObservable::getInstrumentDescriptor);
// Append table row
sb.append(
String.format(
"| %s | %s | %s | %s | %s | %s |%n",
"`" + group + "`",
"`" + kafkaMetricId.getName() + "`",
kafkaMetricId.getAttributeKeys().stream()
.map(key -> "`" + key + "`")
.collect(joining(",")),
descriptor.map(i -> "`" + i.getName() + "`").orElse(""),
descriptor.map(InstrumentDescriptor::getDescription).orElse(""),
descriptor.map(i -> "`" + i.getInstrumentType() + "`").orElse("")));
}
}
logger.info("Mapping table" + System.lineSeparator() + sb);
}
/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
* any time.
*/
public static class TestMetricsReporter implements MetricsReporter {
private static final Set<KafkaMetricId> seenMetrics = new HashSet<>();
@Override
public void init(List<KafkaMetric> list) {
list.forEach(this::metricChange);
}
@Override
public void metricChange(KafkaMetric kafkaMetric) {
seenMetrics.add(KafkaMetricId.create(kafkaMetric.metricName()));
}
@Override
public void metricRemoval(KafkaMetric kafkaMetric) {}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> map) {}
}
@AutoValue
abstract static class KafkaMetricId {
abstract String getGroup();
abstract String getName();
abstract Set<String> getAttributeKeys();
static KafkaMetricId create(MetricName metricName) {
return new AutoValue_OpenTelemetryMetricsReporterTest_KafkaMetricId(
metricName.group(), metricName.name(), metricName.tags().keySet());
}
} }
} }

View File

@ -8,6 +8,8 @@ package io.opentelemetry.instrumentation.kafka.internal;
import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.MeterBuilder;
import io.opentelemetry.instrumentation.api.internal.EmbeddedInstrumentationProperties;
import io.opentelemetry.instrumentation.api.internal.GuardedBy; import io.opentelemetry.instrumentation.api.internal.GuardedBy;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
@ -34,6 +36,8 @@ import org.apache.kafka.common.metrics.MetricsReporter;
public final class OpenTelemetryMetricsReporter implements MetricsReporter { public final class OpenTelemetryMetricsReporter implements MetricsReporter {
public static final String CONFIG_KEY_OPENTELEMETRY_INSTANCE = "opentelemetry.instance"; public static final String CONFIG_KEY_OPENTELEMETRY_INSTANCE = "opentelemetry.instance";
public static final String CONFIG_KEY_OPENTELEMETRY_INSTRUMENTATION_NAME =
"opentelemetry.instrumentation_name";
private static final Logger logger = private static final Logger logger =
Logger.getLogger(OpenTelemetryMetricsReporter.class.getName()); Logger.getLogger(OpenTelemetryMetricsReporter.class.getName());
@ -146,17 +150,30 @@ public final class OpenTelemetryMetricsReporter implements MetricsReporter {
@Override @Override
public void configure(Map<String, ?> configs) { public void configure(Map<String, ?> configs) {
Object openTelemetry = configs.get(CONFIG_KEY_OPENTELEMETRY_INSTANCE); OpenTelemetry openTelemetry =
if (openTelemetry == null) { getProperty(configs, CONFIG_KEY_OPENTELEMETRY_INSTANCE, OpenTelemetry.class);
throw new IllegalStateException( String instrumentationName =
"Missing required configuration property: " + CONFIG_KEY_OPENTELEMETRY_INSTANCE); getProperty(configs, CONFIG_KEY_OPENTELEMETRY_INSTRUMENTATION_NAME, String.class);
String instrumentationVersion =
EmbeddedInstrumentationProperties.findVersion(instrumentationName);
MeterBuilder meterBuilder = openTelemetry.meterBuilder(instrumentationName);
if (instrumentationVersion != null) {
meterBuilder.setInstrumentationVersion(instrumentationVersion);
} }
if (!(openTelemetry instanceof OpenTelemetry)) { meter = meterBuilder.build();
throw new IllegalStateException(
"Configuration property "
+ CONFIG_KEY_OPENTELEMETRY_INSTANCE
+ " is not instance of OpenTelemetry");
} }
meter = ((OpenTelemetry) openTelemetry).getMeter("io.opentelemetry.kafka-clients");
@SuppressWarnings("unchecked")
private static <T> T getProperty(Map<String, ?> configs, String key, Class<T> requiredType) {
Object value = configs.get(key);
if (value == null) {
throw new IllegalStateException("Missing required configuration property: " + key);
}
if (!requiredType.isInstance(value)) {
throw new IllegalStateException(
"Configuration property " + key + " is not instance of " + requiredType.getSimpleName());
}
return (T) value;
} }
} }