From e7db2c02461ca46fd722026f80ce1be1948d245b Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Tue, 17 Oct 2023 16:47:11 +0300 Subject: [PATCH] Allow enabling receive telemetry in kafka library instrumentation (#9693) --- .../v2_6/KafkaTelemetryBuilder.java | 17 ++++++++++++++++- .../kafka/v2_7/SpringKafkaTelemetryBuilder.java | 6 ++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetryBuilder.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetryBuilder.java index 87937be28d..fc468ca762 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetryBuilder.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetryBuilder.java @@ -30,6 +30,7 @@ public final class KafkaTelemetryBuilder { private List capturedHeaders = emptyList(); private boolean captureExperimentalSpanAttributes = false; private boolean propagationEnabled = true; + private boolean messagingReceiveInstrumentationEnabled = false; KafkaTelemetryBuilder(OpenTelemetry openTelemetry) { this.openTelemetry = Objects.requireNonNull(openTelemetry); @@ -85,11 +86,25 @@ public final class KafkaTelemetryBuilder { return this; } + /** + * Set whether to capture the consumer message receive telemetry in messaging instrumentation. + * + *

Note that this will cause the consumer side to start a new trace, with only a span link + * connecting it to the producer trace. + */ + @CanIgnoreReturnValue + public KafkaTelemetryBuilder setMessagingReceiveInstrumentationEnabled( + boolean messagingReceiveInstrumentationEnabled) { + this.messagingReceiveInstrumentationEnabled = messagingReceiveInstrumentationEnabled; + return this; + } + public KafkaTelemetry build() { KafkaInstrumenterFactory instrumenterFactory = new KafkaInstrumenterFactory(openTelemetry, INSTRUMENTATION_NAME) .setCapturedHeaders(capturedHeaders) - .setCaptureExperimentalSpanAttributes(captureExperimentalSpanAttributes); + .setCaptureExperimentalSpanAttributes(captureExperimentalSpanAttributes) + .setMessagingReceiveInstrumentationEnabled(messagingReceiveInstrumentationEnabled); return new KafkaTelemetry( openTelemetry, diff --git a/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/SpringKafkaTelemetryBuilder.java b/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/SpringKafkaTelemetryBuilder.java index b65725b5e6..0edd8a78f5 100644 --- a/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/SpringKafkaTelemetryBuilder.java +++ b/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/SpringKafkaTelemetryBuilder.java @@ -50,6 +50,12 @@ public final class SpringKafkaTelemetryBuilder { return this; } + /** + * Set whether to capture the consumer message receive telemetry in messaging instrumentation. + * + *

Note that this will cause the consumer side to start a new trace, with only a span link + * connecting it to the producer trace. + */ @CanIgnoreReturnValue public SpringKafkaTelemetryBuilder setMessagingReceiveInstrumentationEnabled( boolean messagingReceiveInstrumentationEnabled) {