Move kafka metrics to separate instrumentation module (#9862)

This commit is contained in:
Lauri Tulmin 2023-11-16 13:52:30 +02:00 committed by GitHub
parent 7f0b079fed
commit 977a6f9c38
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 218 additions and 111 deletions

View File

@ -8,6 +8,7 @@ muzzle {
module.set("kafka-clients")
versions.set("[0.11.0.0,)")
assertInverse.set(true)
excludeInstrumentationName("kafka-clients-metrics")
}
}

View File

@ -7,8 +7,6 @@ package io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11;
import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext;
import static io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.KafkaSingletons.consumerReceiveInstrumenter;
import static io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.KafkaSingletons.enhanceConfig;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.returns;
@ -24,9 +22,6 @@ import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTra
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
@ -43,12 +38,6 @@ public class KafkaConsumerInstrumentation implements TypeInstrumentation {
@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isConstructor().and(takesArgument(0, Map.class)),
this.getClass().getName() + "$ConstructorMapAdvice");
transformer.applyAdviceToMethod(
isConstructor().and(takesArgument(0, Properties.class)),
this.getClass().getName() + "$ConstructorPropertiesAdvice");
transformer.applyAdviceToMethod(
named("poll")
.and(isPublic())
@ -58,29 +47,6 @@ public class KafkaConsumerInstrumentation implements TypeInstrumentation {
this.getClass().getName() + "$PollAdvice");
}
@SuppressWarnings("unused")
public static class ConstructorMapAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(value = 0, readOnly = false) Map<String, Object> config) {
// ensure config is a mutable map
if (config.getClass() != HashMap.class) {
config = new HashMap<>(config);
}
enhanceConfig(config);
}
}
@SuppressWarnings("unused")
public static class ConstructorPropertiesAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(@Advice.Argument(0) Properties config) {
enhanceConfig(config);
}
}
@SuppressWarnings("unused")
public static class PollAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)

View File

@ -5,9 +5,7 @@
package io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11;
import static io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.KafkaSingletons.enhanceConfig;
import static io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.KafkaSingletons.producerInstrumenter;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
@ -20,9 +18,6 @@ import io.opentelemetry.instrumentation.kafka.internal.KafkaPropagation;
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
@ -39,12 +34,6 @@ public class KafkaProducerInstrumentation implements TypeInstrumentation {
@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isConstructor().and(takesArgument(0, Map.class)),
this.getClass().getName() + "$ConstructorMapAdvice");
transformer.applyAdviceToMethod(
isConstructor().and(takesArgument(0, Properties.class)),
this.getClass().getName() + "$ConstructorPropertiesAdvice");
transformer.applyAdviceToMethod(
isMethod()
.and(isPublic())
@ -54,29 +43,6 @@ public class KafkaProducerInstrumentation implements TypeInstrumentation {
KafkaProducerInstrumentation.class.getName() + "$SendAdvice");
}
@SuppressWarnings("unused")
public static class ConstructorMapAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(value = 0, readOnly = false) Map<String, Object> config) {
// ensure config is a mutable map
if (config.getClass() != HashMap.class) {
config = new HashMap<>(config);
}
enhanceConfig(config);
}
}
@SuppressWarnings("unused")
public static class ConstructorPropertiesAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(@Advice.Argument(0) Properties config) {
enhanceConfig(config);
}
}
@SuppressWarnings("unused")
public static class SendAdvice {

View File

@ -11,15 +11,9 @@ import io.opentelemetry.instrumentation.kafka.internal.KafkaInstrumenterFactory;
import io.opentelemetry.instrumentation.kafka.internal.KafkaProcessRequest;
import io.opentelemetry.instrumentation.kafka.internal.KafkaProducerRequest;
import io.opentelemetry.instrumentation.kafka.internal.KafkaReceiveRequest;
import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetryMetricsReporter;
import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetrySupplier;
import io.opentelemetry.javaagent.bootstrap.internal.DeprecatedConfigProperties;
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.RecordMetadata;
public final class KafkaSingletons {
@ -31,9 +25,6 @@ public final class KafkaSingletons {
"otel.instrumentation.kafka.client-propagation.enabled",
"otel.instrumentation.kafka.producer-propagation.enabled",
true);
private static final boolean METRICS_ENABLED =
InstrumentationConfig.get()
.getBoolean("otel.instrumentation.kafka.metric-reporter.enabled", true);
private static final Instrumenter<KafkaProducerRequest, RecordMetadata> PRODUCER_INSTRUMENTER;
private static final Instrumenter<KafkaReceiveRequest, Void> CONSUMER_RECEIVE_INSTRUMENTER;
@ -69,39 +60,5 @@ public final class KafkaSingletons {
return CONSUMER_PROCESS_INSTRUMENTER;
}
@SuppressWarnings("unchecked")
public static void enhanceConfig(Map<? super String, Object> config) {
// skip enhancing configuration when metrics are disabled or when we have already enhanced it
if (!METRICS_ENABLED
|| config.get(OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTRUMENTATION_NAME)
!= null) {
return;
}
config.merge(
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
OpenTelemetryMetricsReporter.class.getName(),
(class1, class2) -> {
// class1 is either a class name or List of class names or classes
if (class1 instanceof List) {
List<Object> result = new ArrayList<>();
result.addAll((List<Object>) class1);
result.add(class2);
return result;
} else if (class1 instanceof String) {
String className1 = (String) class1;
if (className1.isEmpty()) {
return class2;
}
}
return class1 + "," + class2;
});
config.put(
OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER,
new OpenTelemetrySupplier(GlobalOpenTelemetry.get()));
config.put(
OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTRUMENTATION_NAME,
INSTRUMENTATION_NAME);
}
private KafkaSingletons() {}
}

View File

@ -0,0 +1,61 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.metrics;
import static io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.metrics.KafkaMetricsUtil.enhanceConfig;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
public class KafkaMetricsConsumerInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.apache.kafka.clients.consumer.KafkaConsumer");
}
@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isConstructor().and(takesArgument(0, Map.class)),
this.getClass().getName() + "$ConstructorMapAdvice");
transformer.applyAdviceToMethod(
isConstructor().and(takesArgument(0, Properties.class)),
this.getClass().getName() + "$ConstructorPropertiesAdvice");
}
@SuppressWarnings("unused")
public static class ConstructorMapAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(value = 0, readOnly = false) Map<String, Object> config) {
// ensure config is a mutable map
if (config.getClass() != HashMap.class) {
config = new HashMap<>(config);
}
enhanceConfig(config);
}
}
@SuppressWarnings("unused")
public static class ConstructorPropertiesAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(@Advice.Argument(0) Properties config) {
enhanceConfig(config);
}
}
}

View File

@ -0,0 +1,37 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.metrics;
import static java.util.Arrays.asList;
import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import java.util.List;
@AutoService(InstrumentationModule.class)
public class KafkaMetricsInstrumentationModule extends InstrumentationModule {
public KafkaMetricsInstrumentationModule() {
super(
"kafka-clients-metrics",
"kafka-clients",
"kafka-clients-metrics-0.11",
"kafka-clients-0.11",
"kafka");
}
@Override
public boolean isIndyModule() {
// OpenTelemetryMetricsReporter is not available in app class loader
return false;
}
@Override
public List<TypeInstrumentation> typeInstrumentations() {
return asList(
new KafkaMetricsProducerInstrumentation(), new KafkaMetricsConsumerInstrumentation());
}
}

View File

@ -0,0 +1,61 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.metrics;
import static io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.metrics.KafkaMetricsUtil.enhanceConfig;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
public class KafkaMetricsProducerInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.apache.kafka.clients.producer.KafkaProducer");
}
@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isConstructor().and(takesArgument(0, Map.class)),
this.getClass().getName() + "$ConstructorMapAdvice");
transformer.applyAdviceToMethod(
isConstructor().and(takesArgument(0, Properties.class)),
this.getClass().getName() + "$ConstructorPropertiesAdvice");
}
@SuppressWarnings("unused")
public static class ConstructorMapAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(value = 0, readOnly = false) Map<String, Object> config) {
// ensure config is a mutable map
if (config.getClass() != HashMap.class) {
config = new HashMap<>(config);
}
enhanceConfig(config);
}
}
@SuppressWarnings("unused")
public static class ConstructorPropertiesAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(@Advice.Argument(0) Properties config) {
enhanceConfig(config);
}
}
}

View File

@ -0,0 +1,58 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.metrics;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetryMetricsReporter;
import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetrySupplier;
import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.CommonClientConfigs;
public final class KafkaMetricsUtil {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.kafka-clients-0.11";
private static final boolean METRICS_ENABLED =
InstrumentationConfig.get()
.getBoolean("otel.instrumentation.kafka.metric-reporter.enabled", true);
@SuppressWarnings("unchecked")
public static void enhanceConfig(Map<? super String, Object> config) {
// skip enhancing configuration when metrics are disabled or when we have already enhanced it
if (!METRICS_ENABLED
|| config.get(OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTRUMENTATION_NAME)
!= null) {
return;
}
config.merge(
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
OpenTelemetryMetricsReporter.class.getName(),
(class1, class2) -> {
// class1 is either a class name or List of class names or classes
if (class1 instanceof List) {
List<Object> result = new ArrayList<>();
result.addAll((List<Object>) class1);
result.add(class2);
return result;
} else if (class1 instanceof String) {
String className1 = (String) class1;
if (className1.isEmpty()) {
return class2;
}
}
return class1 + "," + class2;
});
config.put(
OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER,
new OpenTelemetrySupplier(GlobalOpenTelemetry.get()));
config.put(
OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTRUMENTATION_NAME,
INSTRUMENTATION_NAME);
}
private KafkaMetricsUtil() {}
}