diff --git a/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_clients/KafkaConsumerInstrumentation.java b/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_clients/KafkaConsumerInstrumentation.java index 8f67d30e85..308c5e99fb 100644 --- a/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_clients/KafkaConsumerInstrumentation.java +++ b/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_clients/KafkaConsumerInstrumentation.java @@ -92,7 +92,7 @@ public final class KafkaConsumerInstrumentation extends Instrumenter.Default { @Advice.OnMethodExit(suppress = Throwable.class) public static void wrap(@Advice.Return(readOnly = false) Iterable iterable) { if (iterable != null) { - iterable = new TracingIterable(iterable, "kafka.consume", CONSUMER_DECORATE); + iterable = new TracingIterable(iterable, CONSUMER_DECORATE); } } } @@ -102,7 +102,7 @@ public final class KafkaConsumerInstrumentation extends Instrumenter.Default { @Advice.OnMethodExit(suppress = Throwable.class) public static void wrap(@Advice.Return(readOnly = false) List iterable) { if (iterable != null) { - iterable = new TracingList(iterable, "kafka.consume", CONSUMER_DECORATE); + iterable = new TracingList(iterable, CONSUMER_DECORATE); } } } @@ -112,7 +112,7 @@ public final class KafkaConsumerInstrumentation extends Instrumenter.Default { @Advice.OnMethodExit(suppress = Throwable.class) public static void wrap(@Advice.Return(readOnly = false) Iterator iterator) { if (iterator != null) { - iterator = new TracingIterator(iterator, "kafka.consume", CONSUMER_DECORATE); + iterator = new TracingIterator(iterator, CONSUMER_DECORATE); } } } diff --git a/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_clients/KafkaDecorator.java b/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_clients/KafkaDecorator.java index 6d728fc0b3..2eb14d360c 100644 --- a/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_clients/KafkaDecorator.java +++ b/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_clients/KafkaDecorator.java @@ -17,7 +17,6 @@ package io.opentelemetry.auto.instrumentation.kafka_clients; import io.opentelemetry.OpenTelemetry; import io.opentelemetry.auto.bootstrap.instrumentation.decorator.ClientDecorator; -import io.opentelemetry.auto.instrumentation.api.MoreTags; import io.opentelemetry.auto.instrumentation.api.SpanTypes; import io.opentelemetry.trace.Span; import io.opentelemetry.trace.Tracer; @@ -54,24 +53,35 @@ public abstract class KafkaDecorator extends ClientDecorator { return "java-kafka"; } - public void onConsume(final Span span, final ConsumerRecord record) { - if (record != null) { - final String topic = record.topic() == null ? "kafka" : record.topic(); - span.setAttribute(MoreTags.RESOURCE_NAME, "Consume Topic " + topic); - span.setAttribute("partition", record.partition()); - span.setAttribute("offset", record.offset()); + public String spanNameOnConsume(final ConsumerRecord record) { + final String topic = record.topic(); + if (topic != null) { + return topic; + } else { + return "destination"; } } + public String spanNameOnProduce(final ProducerRecord record) { + if (record != null) { + final String topic = record.topic(); + if (topic != null) { + return topic; + } + } + return "destination"; + } + + public void onConsume(final Span span, final ConsumerRecord record) { + span.setAttribute("partition", record.partition()); + span.setAttribute("offset", record.offset()); + } + public void onProduce(final Span span, final ProducerRecord record) { if (record != null) { - - final String topic = record.topic() == null ? "kafka" : record.topic(); if (record.partition() != null) { span.setAttribute("kafka.partition", record.partition()); } - - span.setAttribute(MoreTags.RESOURCE_NAME, "Produce Topic " + topic); } } } diff --git a/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_clients/KafkaProducerInstrumentation.java b/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_clients/KafkaProducerInstrumentation.java index 55a9610dac..836dc52305 100644 --- a/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_clients/KafkaProducerInstrumentation.java +++ b/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_clients/KafkaProducerInstrumentation.java @@ -82,7 +82,11 @@ public final class KafkaProducerInstrumentation extends Instrumenter.Default { @Advice.FieldValue("apiVersions") final ApiVersions apiVersions, @Advice.Argument(value = 0, readOnly = false) ProducerRecord record, @Advice.Argument(value = 1, readOnly = false) Callback callback) { - final Span span = TRACER.spanBuilder("kafka.produce").setSpanKind(PRODUCER).startSpan(); + final Span span = + TRACER + .spanBuilder(PRODUCER_DECORATE.spanNameOnProduce(record)) + .setSpanKind(PRODUCER) + .startSpan(); PRODUCER_DECORATE.afterStart(span); PRODUCER_DECORATE.onProduce(span, record); diff --git a/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_clients/TracingIterable.java b/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_clients/TracingIterable.java index 79c09a408c..d0e99d6778 100644 --- a/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_clients/TracingIterable.java +++ b/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_clients/TracingIterable.java @@ -20,16 +20,11 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; public class TracingIterable implements Iterable { private final Iterable delegate; - private final String operationName; private final KafkaDecorator decorator; private boolean firstIterator = true; - public TracingIterable( - final Iterable delegate, - final String operationName, - final KafkaDecorator decorator) { + public TracingIterable(final Iterable delegate, final KafkaDecorator decorator) { this.delegate = delegate; - this.operationName = operationName; this.decorator = decorator; } @@ -40,7 +35,7 @@ public class TracingIterable implements Iterable { // However, this is not thread-safe, but usually the first (hopefully only) traversal of // ConsumerRecords is performed in the same thread that called poll() if (firstIterator) { - it = new TracingIterator(delegate.iterator(), operationName, decorator); + it = new TracingIterator(delegate.iterator(), decorator); firstIterator = false; } else { it = delegate.iterator(); diff --git a/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_clients/TracingIterator.java b/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_clients/TracingIterator.java index cb7c9de556..479d7a6058 100644 --- a/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_clients/TracingIterator.java +++ b/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_clients/TracingIterator.java @@ -29,7 +29,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; @Slf4j public class TracingIterator implements Iterator { private final Iterator delegateIterator; - private final String operationName; private final KafkaDecorator decorator; /** @@ -39,11 +38,8 @@ public class TracingIterator implements Iterator { private SpanWithScope currentSpanWithScope; public TracingIterator( - final Iterator delegateIterator, - final String operationName, - final KafkaDecorator decorator) { + final Iterator delegateIterator, final KafkaDecorator decorator) { this.delegateIterator = delegateIterator; - this.operationName = operationName; this.decorator = decorator; } @@ -71,7 +67,7 @@ public class TracingIterator implements Iterator { try { if (next != null) { final boolean consumer = !TRACER.getCurrentSpan().getContext().isValid(); - final Span.Builder spanBuilder = TRACER.spanBuilder(operationName); + final Span.Builder spanBuilder = TRACER.spanBuilder(decorator.spanNameOnConsume(next)); if (consumer) { spanBuilder.setSpanKind(CONSUMER); } diff --git a/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_clients/TracingList.java b/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_clients/TracingList.java index c5ccde9f3f..37bb3b546d 100644 --- a/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_clients/TracingList.java +++ b/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_clients/TracingList.java @@ -23,11 +23,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; public class TracingList extends TracingIterable implements List { private final List delegate; - public TracingList( - final List delegate, - final String operationName, - final KafkaDecorator decorator) { - super(delegate, operationName, decorator); + public TracingList(final List delegate, final KafkaDecorator decorator) { + super(delegate, decorator); this.delegate = delegate; } diff --git a/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy b/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy index c050c2a13d..f937c79c7d 100644 --- a/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy +++ b/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy @@ -99,25 +99,23 @@ class KafkaClientTest extends AgentTestRunner { assertTraces(1) { trace(0, 2) { span(0) { - operationName "kafka.produce" + operationName SHARED_TOPIC spanKind PRODUCER errored false parent() tags { "$MoreTags.SERVICE_NAME" "kafka" - "$MoreTags.RESOURCE_NAME" "Produce Topic $SHARED_TOPIC" "$MoreTags.SPAN_TYPE" "queue" "$Tags.COMPONENT" "java-kafka" } } span(1) { - operationName "kafka.consume" + operationName SHARED_TOPIC spanKind CONSUMER errored false childOf span(0) tags { "$MoreTags.SERVICE_NAME" "kafka" - "$MoreTags.RESOURCE_NAME" "Consume Topic $SHARED_TOPIC" "$MoreTags.SPAN_TYPE" "queue" "$Tags.COMPONENT" "java-kafka" "partition" { it >= 0 } @@ -170,26 +168,24 @@ class KafkaClientTest extends AgentTestRunner { assertTraces(1) { trace(0, 2) { span(0) { - operationName "kafka.produce" + operationName SHARED_TOPIC spanKind PRODUCER errored false parent() tags { "$MoreTags.SERVICE_NAME" "kafka" - "$MoreTags.RESOURCE_NAME" "Produce Topic $SHARED_TOPIC" "$MoreTags.SPAN_TYPE" "queue" "$Tags.COMPONENT" "java-kafka" "kafka.partition" { it >= 0 } } } span(1) { - operationName "kafka.consume" + operationName SHARED_TOPIC spanKind CONSUMER errored false childOf span(0) tags { "$MoreTags.SERVICE_NAME" "kafka" - "$MoreTags.RESOURCE_NAME" "Consume Topic $SHARED_TOPIC" "$MoreTags.SPAN_TYPE" "queue" "$Tags.COMPONENT" "java-kafka" "partition" { it >= 0 } diff --git a/instrumentation/kafka-streams-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_streams/KafkaStreamsDecorator.java b/instrumentation/kafka-streams-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_streams/KafkaStreamsDecorator.java index f2e47c6ac4..c2fa361435 100644 --- a/instrumentation/kafka-streams-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_streams/KafkaStreamsDecorator.java +++ b/instrumentation/kafka-streams-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_streams/KafkaStreamsDecorator.java @@ -17,7 +17,6 @@ package io.opentelemetry.auto.instrumentation.kafka_streams; import io.opentelemetry.OpenTelemetry; import io.opentelemetry.auto.bootstrap.instrumentation.decorator.ClientDecorator; -import io.opentelemetry.auto.instrumentation.api.MoreTags; import io.opentelemetry.auto.instrumentation.api.SpanTypes; import io.opentelemetry.trace.Span; import io.opentelemetry.trace.Tracer; @@ -44,10 +43,20 @@ public class KafkaStreamsDecorator extends ClientDecorator { return SpanTypes.MESSAGE_CONSUMER; } + public String spanNameForConsume(final StampedRecord record) { + if (record == null) { + return null; + } + final String topic = record.topic(); + if (topic != null) { + return topic; + } else { + return "destination"; + } + } + public void onConsume(final Span span, final StampedRecord record) { if (record != null) { - final String topic = record.topic() == null ? "kafka" : record.topic(); - span.setAttribute(MoreTags.RESOURCE_NAME, "Consume Topic " + topic); span.setAttribute("partition", record.partition()); span.setAttribute("offset", record.offset()); } diff --git a/instrumentation/kafka-streams-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_streams/KafkaStreamsProcessorInstrumentation.java b/instrumentation/kafka-streams-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_streams/KafkaStreamsProcessorInstrumentation.java index 3f9ebe0ab6..3b216ca7d7 100644 --- a/instrumentation/kafka-streams-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_streams/KafkaStreamsProcessorInstrumentation.java +++ b/instrumentation/kafka-streams-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafka_streams/KafkaStreamsProcessorInstrumentation.java @@ -103,7 +103,8 @@ public class KafkaStreamsProcessorInstrumentation { return; } - final Span.Builder spanBuilder = TRACER.spanBuilder("kafka.consume").setSpanKind(CONSUMER); + final Span.Builder spanBuilder = + TRACER.spanBuilder(CONSUMER_DECORATE.spanNameForConsume(record)).setSpanKind(CONSUMER); try { final SpanContext extractedContext = TRACER.getHttpTextFormat().extract(record.value.headers(), GETTER); diff --git a/instrumentation/kafka-streams-0.11/src/test/groovy/KafkaStreamsTest.groovy b/instrumentation/kafka-streams-0.11/src/test/groovy/KafkaStreamsTest.groovy index cf8ab28ec9..bc3df9968c 100644 --- a/instrumentation/kafka-streams-0.11/src/test/groovy/KafkaStreamsTest.groovy +++ b/instrumentation/kafka-streams-0.11/src/test/groovy/KafkaStreamsTest.groovy @@ -138,26 +138,24 @@ class KafkaStreamsTest extends AgentTestRunner { trace(0, 5) { // PRODUCER span 0 span(0) { - operationName "kafka.produce" + operationName STREAM_PENDING spanKind PRODUCER errored false parent() tags { "$MoreTags.SERVICE_NAME" "kafka" - "$MoreTags.RESOURCE_NAME" "Produce Topic $STREAM_PENDING" "$MoreTags.SPAN_TYPE" "queue" "$Tags.COMPONENT" "java-kafka" } } // CONSUMER span 0 span(1) { - operationName "kafka.consume" + operationName STREAM_PENDING spanKind CONSUMER errored false childOf span(0) tags { "$MoreTags.SERVICE_NAME" "kafka" - "$MoreTags.RESOURCE_NAME" "Consume Topic $STREAM_PENDING" "$MoreTags.SPAN_TYPE" "queue" "$Tags.COMPONENT" "java-kafka" "partition" { it >= 0 } @@ -166,13 +164,12 @@ class KafkaStreamsTest extends AgentTestRunner { } // STREAMING span 1 span(2) { - operationName "kafka.consume" + operationName STREAM_PENDING spanKind CONSUMER errored false childOf span(0) tags { "$MoreTags.SERVICE_NAME" "kafka" - "$MoreTags.RESOURCE_NAME" "Consume Topic $STREAM_PENDING" "$MoreTags.SPAN_TYPE" "queue" "$Tags.COMPONENT" "java-kafka" "partition" { it >= 0 } @@ -182,26 +179,24 @@ class KafkaStreamsTest extends AgentTestRunner { } // STREAMING span 0 span(3) { - operationName "kafka.produce" + operationName STREAM_PROCESSED spanKind PRODUCER errored false childOf span(2) tags { "$MoreTags.SERVICE_NAME" "kafka" - "$MoreTags.RESOURCE_NAME" "Produce Topic $STREAM_PROCESSED" "$MoreTags.SPAN_TYPE" "queue" "$Tags.COMPONENT" "java-kafka" } } // CONSUMER span 0 span(4) { - operationName "kafka.consume" + operationName STREAM_PROCESSED spanKind CONSUMER errored false childOf span(3) tags { "$MoreTags.SERVICE_NAME" "kafka" - "$MoreTags.RESOURCE_NAME" "Consume Topic $STREAM_PROCESSED" "$MoreTags.SPAN_TYPE" "queue" "$Tags.COMPONENT" "java-kafka" "partition" { it >= 0 }