diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/io/opentelemetry/instrumentation/kafkaclients/KafkaClientDefaultTest.groovy b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/io/opentelemetry/instrumentation/kafkaclients/KafkaClientDefaultTest.groovy deleted file mode 100644 index b65e96a267..0000000000 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/io/opentelemetry/instrumentation/kafkaclients/KafkaClientDefaultTest.groovy +++ /dev/null @@ -1,277 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.instrumentation.kafkaclients - -import io.opentelemetry.sdk.trace.data.SpanData -import io.opentelemetry.semconv.trace.attributes.SemanticAttributes -import java.nio.charset.StandardCharsets -import org.apache.kafka.clients.producer.ProducerRecord - -import java.time.Duration -import java.util.concurrent.TimeUnit - -import static io.opentelemetry.api.trace.SpanKind.CONSUMER -import static io.opentelemetry.api.trace.SpanKind.INTERNAL -import static io.opentelemetry.api.trace.SpanKind.PRODUCER - -class KafkaClientDefaultTest extends KafkaClientPropagationBaseTest { - - def "test kafka produce and consume, test headers: #testHeaders"() { - when: - String greeting = "Hello Kafka!" - runWithSpan("parent") { - def producerRecord = new ProducerRecord(SHARED_TOPIC, greeting) - if (testHeaders) { - producerRecord.headers().add("test-message-header", "test".getBytes(StandardCharsets.UTF_8)) - } - producer.send(producerRecord) { meta, ex -> - if (ex == null) { - runWithSpan("producer callback") {} - } else { - runWithSpan("producer exception: " + ex) {} - } - }.get(5, TimeUnit.SECONDS) - } - - then: - awaitUntilConsumerIsReady() - def records = consumer.poll(Duration.ofSeconds(5).toMillis()) - records.count() == 1 - - // iterate over records to generate spans - for (record in records) { - runWithSpan("processing") { - assert record.value() == greeting - assert record.key() == null - } - } - - assertTraces(2) { - traces.sort(orderByRootSpanKind(INTERNAL, CONSUMER)) - - SpanData producerSpan - - trace(0, 3) { - span(0) { - name "parent" - kind INTERNAL - hasNoParent() - } - span(1) { - name SHARED_TOPIC + " send" - kind PRODUCER - childOf span(0) - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" - "$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC - "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" - "$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 } - "messaging.kafka.message.offset" { it >= 0 } - if (testHeaders) { - "messaging.header.test_message_header" { it == ["test"] } - } - } - } - span(2) { - name "producer callback" - kind INTERNAL - childOf span(0) - } - - producerSpan = span(1) - } - trace(1, 3) { - span(0) { - name SHARED_TOPIC + " receive" - kind CONSUMER - hasNoParent() - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" - "$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC - "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" - "$SemanticAttributes.MESSAGING_OPERATION" "receive" - if (testHeaders) { - "messaging.header.test_message_header" { it == ["test"] } - } - } - } - span(1) { - name SHARED_TOPIC + " process" - kind CONSUMER - childOf span(0) - hasLink producerSpan - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" - "$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC - "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" - "$SemanticAttributes.MESSAGING_OPERATION" "process" - "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long - "$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 } - "messaging.kafka.message.offset" { it >= 0 } - "kafka.record.queue_time_ms" { it >= 0 } - if (testHeaders) { - "messaging.header.test_message_header" { it == ["test"] } - } - } - } - span(2) { - name "processing" - childOf span(1) - } - } - } - - where: - testHeaders << [false, true] - } - - def "test pass through tombstone"() { - when: - producer.send(new ProducerRecord<>(SHARED_TOPIC, null)).get(5, TimeUnit.SECONDS) - - then: - awaitUntilConsumerIsReady() - def records = consumer.poll(Duration.ofSeconds(5).toMillis()) - records.count() == 1 - - // iterate over records to generate spans - for (record in records) { - assert record.value() == null - assert record.key() == null - } - - assertTraces(2) { - traces.sort(orderByRootSpanKind(PRODUCER, CONSUMER)) - - SpanData producerSpan - - trace(0, 1) { - span(0) { - name SHARED_TOPIC + " send" - kind PRODUCER - hasNoParent() - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" - "$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC - "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" - "$SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE" true - "$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 } - "messaging.kafka.message.offset" { it >= 0 } - } - } - - producerSpan = span(0) - } - trace(1, 2) { - span(0) { - name SHARED_TOPIC + " receive" - kind CONSUMER - hasNoParent() - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" - "$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC - "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" - "$SemanticAttributes.MESSAGING_OPERATION" "receive" - } - } - span(1) { - name SHARED_TOPIC + " process" - kind CONSUMER - childOf span(0) - hasLink producerSpan - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" - "$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC - "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" - "$SemanticAttributes.MESSAGING_OPERATION" "process" - "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long - "$SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE" true - "$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 } - "messaging.kafka.message.offset" { it >= 0 } - "kafka.record.queue_time_ms" { it >= 0 } - } - } - } - } - } - - def "test records(TopicPartition) kafka consume"() { - setup: - def partition = 0 - - when: "send message" - def greeting = "Hello from MockConsumer!" - producer.send(new ProducerRecord<>(SHARED_TOPIC, partition, null, greeting)).get(5, TimeUnit.SECONDS) - - then: "wait for PRODUCER span" - waitForTraces(1) - - when: "receive messages" - awaitUntilConsumerIsReady() - def consumerRecords = consumer.poll(Duration.ofSeconds(5).toMillis()) - def recordsInPartition = consumerRecords.records(KafkaClientBaseTest.topicPartition) - recordsInPartition.size() == 1 - - // iterate over records to generate spans - for (record in recordsInPartition) { - assert record.value() == greeting - assert record.key() == null - } - - then: - assertTraces(2) { - traces.sort(orderByRootSpanKind(PRODUCER, CONSUMER)) - - SpanData producerSpan - - trace(0, 1) { - span(0) { - name SHARED_TOPIC + " send" - kind PRODUCER - hasNoParent() - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" - "$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC - "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" - "$SemanticAttributes.MESSAGING_KAFKA_PARTITION" partition - "messaging.kafka.message.offset" { it >= 0 } - } - } - - producerSpan = span(0) - } - trace(1, 2) { - span(0) { - name SHARED_TOPIC + " receive" - kind CONSUMER - hasNoParent() - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" - "$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC - "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" - "$SemanticAttributes.MESSAGING_OPERATION" "receive" - } - } - span(1) { - name SHARED_TOPIC + " process" - kind CONSUMER - childOf span(0) - hasLink producerSpan - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" - "$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC - "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" - "$SemanticAttributes.MESSAGING_OPERATION" "process" - "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long - "$SemanticAttributes.MESSAGING_KAFKA_PARTITION" partition - "messaging.kafka.message.offset" { it >= 0 } - "kafka.record.queue_time_ms" { it >= 0 } - } - } - } - } - } -} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/io/opentelemetry/instrumentation/kafkaclients/KafkaClientPropagationDisabledTest.groovy b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/io/opentelemetry/instrumentation/kafkaclients/KafkaClientPropagationDisabledTest.groovy deleted file mode 100644 index 5d52dc788c..0000000000 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/io/opentelemetry/instrumentation/kafkaclients/KafkaClientPropagationDisabledTest.groovy +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.instrumentation.kafkaclients - -import io.opentelemetry.semconv.trace.attributes.SemanticAttributes -import org.apache.kafka.clients.producer.ProducerRecord - -import java.time.Duration - -import static io.opentelemetry.api.trace.SpanKind.CONSUMER -import static io.opentelemetry.api.trace.SpanKind.PRODUCER - -class KafkaClientPropagationDisabledTest extends KafkaClientPropagationBaseTest { - - def "should not read remote context when consuming messages if propagation is disabled"() { - when: "send message" - String message = "Testing without headers" - producer.send(new ProducerRecord<>(SHARED_TOPIC, message)) - - then: "producer span is created" - assertTraces(1) { - trace(0, 1) { - span(0) { - name SHARED_TOPIC + " send" - kind PRODUCER - hasNoParent() - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" - "$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC - "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" - "$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 } - "messaging.kafka.message.offset" { it >= 0 } - } - } - } - } - - when: "read message without context propagation" - awaitUntilConsumerIsReady() - def records = consumer.poll(Duration.ofSeconds(5).toMillis()) - records.count() == 1 - - // iterate over records to generate spans - for (record in records) { - runWithSpan("processing") {} - } - - then: - assertTraces(2) { - trace(0, 1) { - span(0) { - name SHARED_TOPIC + " send" - kind PRODUCER - hasNoParent() - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" - "$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC - "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" - "$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 } - "messaging.kafka.message.offset" { it >= 0 } - } - } - } - trace(1, 2) { - span(0) { - name SHARED_TOPIC + " process" - kind CONSUMER - hasNoLinks() - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" - "$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC - "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" - "$SemanticAttributes.MESSAGING_OPERATION" "process" - "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long - "$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 } - "messaging.kafka.message.offset" { it >= 0 } - "kafka.record.queue_time_ms" { it >= 0 } - } - span(1) { - name "processing" - childOf span(0) - } - } - } - } - } -} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/io/opentelemetry/instrumentation/kafkaclients/KafkaClientSuppressReceiveSpansTest.groovy b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/io/opentelemetry/instrumentation/kafkaclients/KafkaClientSuppressReceiveSpansTest.groovy deleted file mode 100644 index 92d0d82500..0000000000 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/io/opentelemetry/instrumentation/kafkaclients/KafkaClientSuppressReceiveSpansTest.groovy +++ /dev/null @@ -1,192 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.instrumentation.kafkaclients - -import io.opentelemetry.semconv.trace.attributes.SemanticAttributes -import org.apache.kafka.clients.producer.ProducerRecord -import org.apache.kafka.common.TopicPartition - -import java.time.Duration - -import static io.opentelemetry.api.trace.SpanKind.CONSUMER -import static io.opentelemetry.api.trace.SpanKind.INTERNAL -import static io.opentelemetry.api.trace.SpanKind.PRODUCER - -class KafkaClientSuppressReceiveSpansTest extends KafkaClientPropagationBaseTest { - - def "test kafka produce and consume"() { - when: - String greeting = "Hello Kafka!" - runWithSpan("parent") { - producer.send(new ProducerRecord(SHARED_TOPIC, greeting)) { meta, ex -> - if (ex == null) { - runWithSpan("producer callback") {} - } else { - runWithSpan("producer exception: " + ex) {} - } - } - } - - then: - awaitUntilConsumerIsReady() - // check that the message was received - def records = consumer.poll(Duration.ofSeconds(5).toMillis()) - for (record in records) { - runWithSpan("processing") { - assert record.value() == greeting - assert record.key() == null - } - } - - assertTraces(1) { - trace(0, 5) { - span(0) { - name "parent" - kind INTERNAL - hasNoParent() - } - span(1) { - name SHARED_TOPIC + " send" - kind PRODUCER - childOf span(0) - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" - "$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC - "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" - "$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 } - "messaging.kafka.message.offset" { it >= 0 } - } - } - span(2) { - name SHARED_TOPIC + " process" - kind CONSUMER - childOf span(1) - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" - "$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC - "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" - "$SemanticAttributes.MESSAGING_OPERATION" "process" - "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long - "$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 } - "messaging.kafka.message.offset" { it >= 0 } - "kafka.record.queue_time_ms" { it >= 0 } - } - } - span(3) { - name "processing" - childOf span(2) - } - span(4) { - name "producer callback" - kind INTERNAL - childOf span(0) - } - } - } - } - - def "test pass through tombstone"() { - when: - producer.send(new ProducerRecord<>(SHARED_TOPIC, null)) - - then: - awaitUntilConsumerIsReady() - // check that the message was received - def records = consumer.poll(Duration.ofSeconds(5).toMillis()) - for (record in records) { - assert record.value() == null - assert record.key() == null - } - - assertTraces(1) { - trace(0, 2) { - span(0) { - name SHARED_TOPIC + " send" - kind PRODUCER - hasNoParent() - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" - "$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC - "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" - "$SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE" true - "$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 } - "messaging.kafka.message.offset" { it >= 0 } - } - } - span(1) { - name SHARED_TOPIC + " process" - kind CONSUMER - childOf span(0) - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" - "$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC - "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" - "$SemanticAttributes.MESSAGING_OPERATION" "process" - "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long - "$SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE" true - "$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 } - "messaging.kafka.message.offset" { it >= 0 } - "kafka.record.queue_time_ms" { it >= 0 } - } - } - } - } - } - - def "test records(TopicPartition) kafka consume"() { - setup: - def partition = 0 - - when: "send message" - def greeting = "Hello from MockConsumer!" - producer.send(new ProducerRecord<>(SHARED_TOPIC, partition, null, greeting)) - - then: "wait for PRODUCER span" - waitForTraces(1) - awaitUntilConsumerIsReady() - - when: "receive messages" - def consumerRecords = consumer.poll(Duration.ofSeconds(5).toMillis()) - def recordsInPartition = consumerRecords.records(new TopicPartition(SHARED_TOPIC, partition)) - for (record in recordsInPartition) { - assert record.value() == greeting - assert record.key() == null - } - - then: - assertTraces(1) { - trace(0, 2) { - span(0) { - name SHARED_TOPIC + " send" - kind PRODUCER - hasNoParent() - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" - "$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC - "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" - "$SemanticAttributes.MESSAGING_KAFKA_PARTITION" partition - "messaging.kafka.message.offset" { it >= 0 } - } - } - span(1) { - name SHARED_TOPIC + " process" - kind CONSUMER - childOf span(0) - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" - "$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC - "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" - "$SemanticAttributes.MESSAGING_OPERATION" "process" - "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long - "$SemanticAttributes.MESSAGING_KAFKA_PARTITION" partition - "messaging.kafka.message.offset" { it >= 0 } - "kafka.record.queue_time_ms" { it >= 0 } - } - } - } - } - } -} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/instrumentation/kafkaclients/KafkaClientDefaultTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/instrumentation/kafkaclients/KafkaClientDefaultTest.java new file mode 100644 index 0000000000..d2afc24a31 --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/instrumentation/kafkaclients/KafkaClientDefaultTest.java @@ -0,0 +1,329 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafkaclients; + +import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static org.assertj.core.api.Assertions.assertThat; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.instrumentation.kafka.internal.KafkaClientBaseTest; +import io.opentelemetry.instrumentation.kafka.internal.KafkaClientPropagationBaseTest; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.sdk.trace.data.LinkData; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.assertj.core.api.AbstractLongAssert; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +class KafkaClientDefaultTest extends KafkaClientPropagationBaseTest { + + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @DisplayName("test kafka produce and consume") + @ParameterizedTest(name = "{index} => test headers: {0}") + @ValueSource(booleans = {true, false}) + void testKafkaProducerAndConsumerSpan(boolean testHeaders) throws Exception { + String greeting = "Hello Kafka!"; + testing.runWithSpan( + "parent", + () -> { + ProducerRecord producerRecord = + new ProducerRecord<>(SHARED_TOPIC, greeting); + if (testHeaders) { + producerRecord + .headers() + .add("test-message-header", "test".getBytes(StandardCharsets.UTF_8)); + } + producer + .send( + producerRecord, + (meta, ex) -> { + if (ex == null) { + testing.runWithSpan("producer callback", () -> {}); + } else { + testing.runWithSpan("producer exception: " + ex, () -> {}); + } + }) + .get(5, TimeUnit.SECONDS); + }); + + awaitUntilConsumerIsReady(); + @SuppressWarnings("PreferJavaTimeOverload") + ConsumerRecords records = consumer.poll(Duration.ofSeconds(5).toMillis()); + assertThat(records.count()).isEqualTo(1); + + // iterate over records to generate spans + for (ConsumerRecord record : records) { + testing.runWithSpan( + "processing", + () -> { + assertThat(record.value()).isEqualTo(greeting); + assertThat(record.key()).isNull(); + }); + } + AtomicReference producerSpan = new AtomicReference<>(); + testing.waitAndAssertSortedTraces( + orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), + trace -> { + trace.hasSpansSatisfyingExactly( + span -> { + span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(); + }, + span -> { + span.hasName(SHARED_TOPIC + " send") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfying( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_PARTITION, + AbstractLongAssert::isNotNegative), + satisfies( + AttributeKey.longKey("messaging.kafka.message.offset"), + AbstractLongAssert::isNotNegative)); + if (testHeaders) { + span.hasAttributesSatisfying( + equalTo( + AttributeKey.stringArrayKey("messaging.header.test_message_header"), + Collections.singletonList("test"))); + } + }, + span -> { + span.hasName("producer callback") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(0)); + }); + producerSpan.set(trace.getSpan(1)); + }, + trace -> + trace.hasSpansSatisfyingExactly( + span -> { + span.hasName(SHARED_TOPIC + " receive") + .hasKind(SpanKind.CONSUMER) + .hasNoParent() + .hasAttributesSatisfying( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive")); + if (testHeaders) { + span.hasAttributesSatisfying( + equalTo( + AttributeKey.stringArrayKey("messaging.header.test_message_header"), + Collections.singletonList("test"))); + } + }, + span -> { + span.hasName(SHARED_TOPIC + " process") + .hasKind(SpanKind.CONSUMER) + .hasLinks(LinkData.create(producerSpan.get().getSpanContext())) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfying( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), + equalTo( + SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, + greeting.getBytes(StandardCharsets.UTF_8).length), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_PARTITION, + AbstractLongAssert::isNotNegative), + satisfies( + AttributeKey.longKey("messaging.kafka.message.offset"), + AbstractLongAssert::isNotNegative), + satisfies( + AttributeKey.longKey("kafka.record.queue_time_ms"), + AbstractLongAssert::isNotNegative)); + + if (testHeaders) { + span.hasAttributesSatisfying( + equalTo( + AttributeKey.stringArrayKey("messaging.header.test_message_header"), + Collections.singletonList("test"))); + } + }, + span -> span.hasName("processing").hasParent(trace.getSpan(1)))); + } + + @DisplayName("test pass through tombstone") + @Test + void testPassThroughTombstone() + throws ExecutionException, InterruptedException, TimeoutException { + producer.send(new ProducerRecord<>(SHARED_TOPIC, null)).get(5, TimeUnit.SECONDS); + awaitUntilConsumerIsReady(); + @SuppressWarnings("PreferJavaTimeOverload") + ConsumerRecords records = consumer.poll(Duration.ofSeconds(5).toMillis()); + assertThat(records.count()).isEqualTo(1); + + // iterate over records to generate spans + for (ConsumerRecord record : records) { + assertThat(record.value()).isNull(); + assertThat(record.key()).isNull(); + } + + AtomicReference producerSpan = new AtomicReference<>(); + testing.waitAndAssertSortedTraces( + orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), + trace -> { + trace.hasSpansSatisfyingExactly( + span -> { + span.hasName(SHARED_TOPIC + " send") + .hasKind(SpanKind.PRODUCER) + .hasNoParent() + .hasAttributesSatisfying( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC), + equalTo(SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE, true), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_PARTITION, + AbstractLongAssert::isNotNegative), + satisfies( + AttributeKey.longKey("messaging.kafka.message.offset"), + AbstractLongAssert::isNotNegative)); + }); + producerSpan.set(trace.getSpan(0)); + }, + trace -> { + trace.hasSpansSatisfyingExactly( + span -> { + span.hasName(SHARED_TOPIC + " receive") + .hasKind(SpanKind.CONSUMER) + .hasNoParent() + .hasAttributesSatisfying( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive")); + }, + span -> { + span.hasName(SHARED_TOPIC + " process") + .hasKind(SpanKind.CONSUMER) + .hasLinks(LinkData.create(producerSpan.get().getSpanContext())) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfying( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), + equalTo(SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE, true), + equalTo(SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, -1L), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_PARTITION, + AbstractLongAssert::isNotNegative), + satisfies( + AttributeKey.longKey("messaging.kafka.message.offset"), + AbstractLongAssert::isNotNegative), + satisfies( + AttributeKey.longKey("kafka.record.queue_time_ms"), + AbstractLongAssert::isNotNegative)); + }); + }); + } + + @DisplayName("test records(TopicPartition) kafka consume") + @Test + void testRecordsWithTopicPartitionKafkaConsume() + throws ExecutionException, InterruptedException, TimeoutException { + String greeting = "Hello from MockConsumer!"; + producer + .send(new ProducerRecord<>(SHARED_TOPIC, partition, null, greeting)) + .get(5, TimeUnit.SECONDS); + + testing.waitForTraces(1); + + awaitUntilConsumerIsReady(); + @SuppressWarnings("PreferJavaTimeOverload") + ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(5).toMillis()); + List> recordsInPartition = + consumerRecords.records(KafkaClientBaseTest.topicPartition); + assertThat(recordsInPartition.size()).isEqualTo(1); + + // iterate over records to generate spans + for (ConsumerRecord record : recordsInPartition) { + assertThat(record.value()).isEqualTo(greeting); + assertThat(record.key()).isNull(); + } + + AtomicReference producerSpan = new AtomicReference<>(); + testing.waitAndAssertSortedTraces( + orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), + trace -> { + trace.hasSpansSatisfyingExactly( + span -> { + span.hasName(SHARED_TOPIC + " send") + .hasKind(SpanKind.PRODUCER) + .hasNoParent() + .hasAttributesSatisfying( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + equalTo(SemanticAttributes.MESSAGING_KAFKA_PARTITION, partition), + satisfies( + AttributeKey.longKey("messaging.kafka.message.offset"), + AbstractLongAssert::isNotNegative)); + }); + producerSpan.set(trace.getSpan(0)); + }, + trace -> { + trace.hasSpansSatisfyingExactly( + span -> { + span.hasName(SHARED_TOPIC + " receive") + .hasKind(SpanKind.CONSUMER) + .hasNoParent() + .hasAttributesSatisfying( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive")); + }, + span -> { + span.hasName(SHARED_TOPIC + " process") + .hasKind(SpanKind.CONSUMER) + .hasLinks(LinkData.create(producerSpan.get().getSpanContext())) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfying( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), + equalTo( + SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, + greeting.getBytes(StandardCharsets.UTF_8).length), + equalTo(SemanticAttributes.MESSAGING_KAFKA_PARTITION, partition), + satisfies( + AttributeKey.longKey("messaging.kafka.message.offset"), + AbstractLongAssert::isNotNegative), + satisfies( + AttributeKey.longKey("kafka.record.queue_time_ms"), + AbstractLongAssert::isNotNegative)); + }); + }); + } +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/instrumentation/kafkaclients/KafkaClientPropagationDisabledTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/instrumentation/kafkaclients/KafkaClientPropagationDisabledTest.java new file mode 100644 index 0000000000..c5925fd8cc --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/instrumentation/kafkaclients/KafkaClientPropagationDisabledTest.java @@ -0,0 +1,116 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafkaclients; + +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static org.assertj.core.api.Assertions.assertThat; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.instrumentation.kafka.internal.KafkaClientPropagationBaseTest; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Collections; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.assertj.core.api.AbstractLongAssert; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +class KafkaClientPropagationDisabledTest extends KafkaClientPropagationBaseTest { + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @DisplayName("should not read remote context when consuming messages if propagation is disabled") + @Test + void testReadRemoteContextWhenPropagationIsDisabled() throws InterruptedException { + String message = "Testing without headers"; + producer.send(new ProducerRecord<>(SHARED_TOPIC, message)); + + testing.waitAndAssertTraces( + trace -> { + trace.hasSpansSatisfyingExactly( + span -> { + span.hasName(SHARED_TOPIC + " send") + .hasKind(SpanKind.PRODUCER) + .hasNoParent() + .hasAttributesSatisfying( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_PARTITION, + AbstractLongAssert::isNotNegative), + satisfies( + AttributeKey.longKey("messaging.kafka.message.offset"), + AbstractLongAssert::isNotNegative)); + }); + }); + + awaitUntilConsumerIsReady(); + + @SuppressWarnings("PreferJavaTimeOverload") + ConsumerRecords records = consumer.poll(Duration.ofSeconds(5).toMillis()); + assertThat(records.count()).isEqualTo(1); + + // iterate over records to generate spans + for (ConsumerRecord ignored : records) { + testing.runWithSpan("processing", () -> {}); + } + + testing.waitAndAssertTraces( + trace -> { + trace.hasSpansSatisfyingExactly( + span -> { + span.hasName(SHARED_TOPIC + " send") + .hasKind(SpanKind.PRODUCER) + .hasNoParent() + .hasAttributesSatisfying( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_PARTITION, + AbstractLongAssert::isNotNegative), + satisfies( + AttributeKey.longKey("messaging.kafka.message.offset"), + AbstractLongAssert::isNotNegative)); + }); + }, + trace -> { + trace.hasSpansSatisfyingExactly( + span -> { + span.hasName(SHARED_TOPIC + " process") + .hasKind(SpanKind.CONSUMER) + .hasLinks(Collections.emptyList()) + .hasAttributesSatisfying( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), + equalTo( + SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, + message.getBytes(StandardCharsets.UTF_8).length), + equalTo(SemanticAttributes.MESSAGING_KAFKA_PARTITION, partition), + satisfies( + AttributeKey.longKey("messaging.kafka.message.offset"), + AbstractLongAssert::isNotNegative), + satisfies( + AttributeKey.longKey("kafka.record.queue_time_ms"), + AbstractLongAssert::isNotNegative)); + }, + span -> { + span.hasName("processing").hasParent(trace.getSpan(0)); + }); + }); + } +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/instrumentation/kafkaclients/KafkaClientSuppressReceiveSpansTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/instrumentation/kafkaclients/KafkaClientSuppressReceiveSpansTest.java new file mode 100644 index 0000000000..d9d7395637 --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/instrumentation/kafkaclients/KafkaClientSuppressReceiveSpansTest.java @@ -0,0 +1,242 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafkaclients; + +import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static org.assertj.core.api.Assertions.assertThat; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.instrumentation.kafka.internal.KafkaClientBaseTest; +import io.opentelemetry.instrumentation.kafka.internal.KafkaClientPropagationBaseTest; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.assertj.core.api.AbstractLongAssert; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +class KafkaClientSuppressReceiveSpansTest extends KafkaClientPropagationBaseTest { + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @Test + void testKafkaProduceAndConsume() throws InterruptedException { + String greeting = "Hello Kafka!"; + testing.runWithSpan( + "parent", + () -> { + producer.send( + new ProducerRecord<>(SHARED_TOPIC, greeting), + (meta, ex) -> { + if (ex == null) { + testing.runWithSpan("producer callback", () -> {}); + } else { + testing.runWithSpan("producer exception: " + ex, () -> {}); + } + }); + }); + + awaitUntilConsumerIsReady(); + // check that the message was received + @SuppressWarnings("PreferJavaTimeOverload") + ConsumerRecords records = consumer.poll(Duration.ofSeconds(5).toMillis()); + for (ConsumerRecord record : records) { + testing.runWithSpan( + "processing", + () -> { + assertThat(record.value()).isEqualTo(greeting); + assertThat(record.key()).isNull(); + }); + } + + testing.waitAndAssertTraces( + trace -> { + trace.hasSpansSatisfyingExactly( + span -> { + span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(); + }, + span -> { + span.hasName(SHARED_TOPIC + " send") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfying( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_PARTITION, + AbstractLongAssert::isNotNegative), + satisfies( + AttributeKey.longKey("messaging.kafka.message.offset"), + AbstractLongAssert::isNotNegative)); + }, + span -> { + span.hasName(SHARED_TOPIC + " process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfying( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), + equalTo( + SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, + greeting.getBytes(StandardCharsets.UTF_8).length), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_PARTITION, + AbstractLongAssert::isNotNegative), + satisfies( + AttributeKey.longKey("messaging.kafka.message.offset"), + AbstractLongAssert::isNotNegative), + satisfies( + AttributeKey.longKey("kafka.record.queue_time_ms"), + AbstractLongAssert::isNotNegative)); + }, + span -> { + span.hasName("processing").hasKind(SpanKind.INTERNAL).hasParent(trace.getSpan(2)); + }, + span -> { + span.hasName("producer callback") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(0)); + }); + }); + } + + @Test + void testPassThroughTombstone() + throws ExecutionException, InterruptedException, TimeoutException { + producer.send(new ProducerRecord<>(SHARED_TOPIC, null)).get(5, TimeUnit.SECONDS); + awaitUntilConsumerIsReady(); + @SuppressWarnings("PreferJavaTimeOverload") + ConsumerRecords records = consumer.poll(Duration.ofSeconds(5).toMillis()); + assertThat(records.count()).isEqualTo(1); + + // iterate over records to generate spans + for (ConsumerRecord record : records) { + assertThat(record.value()).isNull(); + assertThat(record.key()).isNull(); + } + + testing.waitAndAssertSortedTraces( + orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), + trace -> { + trace.hasSpansSatisfyingExactly( + span -> { + span.hasName(SHARED_TOPIC + " send") + .hasKind(SpanKind.PRODUCER) + .hasNoParent() + .hasAttributesSatisfying( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC), + equalTo(SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE, true), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_PARTITION, + AbstractLongAssert::isNotNegative), + satisfies( + AttributeKey.longKey("messaging.kafka.message.offset"), + AbstractLongAssert::isNotNegative)); + }, + span -> { + span.hasName(SHARED_TOPIC + " process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfying( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), + equalTo(SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE, true), + equalTo(SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, -1L), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_PARTITION, + AbstractLongAssert::isNotNegative), + satisfies( + AttributeKey.longKey("messaging.kafka.message.offset"), + AbstractLongAssert::isNotNegative), + satisfies( + AttributeKey.longKey("kafka.record.queue_time_ms"), + AbstractLongAssert::isNotNegative)); + }); + }); + } + + @Test + void testRecordsWithTopicPartitionKafkaConsume() + throws ExecutionException, InterruptedException, TimeoutException { + String greeting = "Hello from MockConsumer!"; + producer + .send(new ProducerRecord<>(SHARED_TOPIC, partition, null, greeting)) + .get(5, TimeUnit.SECONDS); + + testing.waitForTraces(1); + + awaitUntilConsumerIsReady(); + @SuppressWarnings("PreferJavaTimeOverload") + ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(5).toMillis()); + List> recordsInPartition = + consumerRecords.records(KafkaClientBaseTest.topicPartition); + assertThat(recordsInPartition.size()).isEqualTo(1); + + // iterate over records to generate spans + for (ConsumerRecord record : recordsInPartition) { + assertThat(record.value()).isEqualTo(greeting); + assertThat(record.key()).isNull(); + } + + testing.waitAndAssertSortedTraces( + orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), + trace -> { + trace.hasSpansSatisfyingExactly( + span -> { + span.hasName(SHARED_TOPIC + " send") + .hasKind(SpanKind.PRODUCER) + .hasNoParent() + .hasAttributesSatisfying( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + equalTo(SemanticAttributes.MESSAGING_KAFKA_PARTITION, partition), + satisfies( + AttributeKey.longKey("messaging.kafka.message.offset"), + AbstractLongAssert::isNotNegative)); + }, + span -> { + span.hasName(SHARED_TOPIC + " process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfying( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), + equalTo( + SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, + greeting.getBytes(StandardCharsets.UTF_8).length), + equalTo(SemanticAttributes.MESSAGING_KAFKA_PARTITION, partition), + satisfies( + AttributeKey.longKey("messaging.kafka.message.offset"), + AbstractLongAssert::isNotNegative), + satisfies( + AttributeKey.longKey("kafka.record.queue_time_ms"), + AbstractLongAssert::isNotNegative)); + }); + }); + } +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/groovy/io/opentelemetry/instrumentation/kafkaclients/KafkaClientBaseTest.groovy b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/groovy/io/opentelemetry/instrumentation/kafkaclients/KafkaClientBaseTest.groovy deleted file mode 100644 index 052396fdd4..0000000000 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/groovy/io/opentelemetry/instrumentation/kafkaclients/KafkaClientBaseTest.groovy +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.instrumentation.kafkaclients - -import io.opentelemetry.instrumentation.test.InstrumentationSpecification -import org.testcontainers.utility.DockerImageName - -import java.time.Duration -import org.apache.kafka.clients.admin.AdminClient -import org.apache.kafka.clients.admin.NewTopic -import org.apache.kafka.clients.consumer.Consumer -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener -import org.apache.kafka.clients.consumer.KafkaConsumer -import org.apache.kafka.clients.producer.KafkaProducer -import org.apache.kafka.clients.producer.Producer -import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.serialization.IntegerDeserializer -import org.apache.kafka.common.serialization.IntegerSerializer -import org.apache.kafka.common.serialization.StringDeserializer -import org.apache.kafka.common.serialization.StringSerializer -import org.slf4j.Logger -import org.slf4j.LoggerFactory -import org.testcontainers.containers.KafkaContainer -import org.testcontainers.containers.output.Slf4jLogConsumer -import org.testcontainers.containers.wait.strategy.Wait -import spock.lang.Shared - -import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit - -abstract class KafkaClientBaseTest extends InstrumentationSpecification { - private static final Logger logger = LoggerFactory.getLogger(KafkaClientBaseTest) - - protected static final SHARED_TOPIC = "shared.topic" - - @Shared - KafkaContainer kafka - @Shared - Producer producer - @Shared - Consumer consumer - @Shared - CountDownLatch consumerReady = new CountDownLatch(1) - - static TopicPartition topicPartition = new TopicPartition(SHARED_TOPIC, 0) - - def setupSpec() { - kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3")) - .withLogConsumer(new Slf4jLogConsumer(logger)) - .waitingFor(Wait.forLogMessage(".*started \\(kafka.server.KafkaServer\\).*", 1)) - .withStartupTimeout(Duration.ofMinutes(1)) - kafka.start() - - // create test topic - AdminClient.create(["bootstrap.servers": kafka.bootstrapServers]).withCloseable { admin -> - admin.createTopics([new NewTopic(SHARED_TOPIC, 1, (short) 1)]).all().get(30, TimeUnit.SECONDS) - } - - producer = new KafkaProducer<>(producerProps()) - - consumer = new KafkaConsumer<>(consumerProps()) - - consumer.subscribe([SHARED_TOPIC], new ConsumerRebalanceListener() { - @Override - void onPartitionsRevoked(Collection collection) { - } - - @Override - void onPartitionsAssigned(Collection collection) { - consumerReady.countDown() - } - }) - } - - Map producerProps() { - // values copied from spring's KafkaTestUtils - return [ - "bootstrap.servers": kafka.bootstrapServers, - "retries" : 0, - "batch.size" : "16384", - "linger.ms" : 1, - "buffer.memory" : "33554432", - "key.serializer" : IntegerSerializer, - "value.serializer" : StringSerializer - ] - } - - Map consumerProps() { - // values copied from spring's KafkaTestUtils - return [ - "bootstrap.servers" : kafka.bootstrapServers, - "group.id" : "test", - "enable.auto.commit" : "true", - "auto.commit.interval.ms": "10", - "session.timeout.ms" : "30000", - "key.deserializer" : IntegerDeserializer, - "value.deserializer" : StringDeserializer - ] - } - - def cleanupSpec() { - consumer?.close() - producer?.close() - kafka.stop() - } - - // Kafka's eventual consistency behavior forces us to do a couple of empty poll() calls until it gets properly assigned a topic partition - void awaitUntilConsumerIsReady() { - if (consumerReady.await(0, TimeUnit.SECONDS)) { - return - } - for (i in 0..<10) { - consumer.poll(0) - if (consumerReady.await(1, TimeUnit.SECONDS)) { - break - } - } - if (consumerReady.getCount() != 0) { - throw new AssertionError("Consumer wasn't assigned any partitions!") - } - consumer.seekToBeginning([]) - } -} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/groovy/io/opentelemetry/instrumentation/kafkaclients/KafkaClientPropagationBaseTest.groovy b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/groovy/io/opentelemetry/instrumentation/kafkaclients/KafkaClientPropagationBaseTest.groovy deleted file mode 100644 index 4c019daf80..0000000000 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/groovy/io/opentelemetry/instrumentation/kafkaclients/KafkaClientPropagationBaseTest.groovy +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.instrumentation.kafkaclients - -import io.opentelemetry.instrumentation.test.AgentTestTrait -import org.apache.kafka.clients.producer.ProducerRecord -import spock.lang.Unroll - -import java.time.Duration - -abstract class KafkaClientPropagationBaseTest extends KafkaClientBaseTest implements AgentTestTrait { - - private static final boolean producerPropagationEnabled = Boolean.parseBoolean( - System.getProperty("otel.instrumentation.kafka.producer-propagation.enabled", "true")) - - @Unroll - def "test kafka client header propagation manual config"() { - when: - String message = "Testing without headers" - producer.send(new ProducerRecord<>(SHARED_TOPIC, message)) - - then: - awaitUntilConsumerIsReady() - // check that the message was received - def records = consumer.poll(Duration.ofSeconds(5).toMillis()) - records.count() == 1 - for (record in records) { - assert record.headers().iterator().hasNext() == producerPropagationEnabled - } - } -} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaClientBaseTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaClientBaseTest.java new file mode 100644 index 0000000000..5705c59f74 --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaClientBaseTest.java @@ -0,0 +1,141 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafka.internal; + +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestInstance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; + +@SuppressWarnings("OtelInternalJavadoc") +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public abstract class KafkaClientBaseTest { + private static final Logger logger = LoggerFactory.getLogger(KafkaClientBaseTest.class); + + protected static final String SHARED_TOPIC = "shared.topic"; + + private KafkaContainer kafka; + protected Producer producer; + protected Consumer consumer; + private final CountDownLatch consumerReady = new CountDownLatch(1); + + public static final int partition = 0; + public static final TopicPartition topicPartition = new TopicPartition(SHARED_TOPIC, partition); + + @BeforeAll + void setupClass() throws ExecutionException, InterruptedException, TimeoutException { + kafka = + new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3")) + .withLogConsumer(new Slf4jLogConsumer(logger)) + .waitingFor(Wait.forLogMessage(".*started \\(kafka.server.KafkaServer\\).*", 1)) + .withStartupTimeout(Duration.ofMinutes(1)); + kafka.start(); + + // create test topic + HashMap adminProps = new HashMap<>(); + adminProps.put("bootstrap.servers", kafka.getBootstrapServers()); + + try (AdminClient admin = AdminClient.create(adminProps)) { + admin + .createTopics(Collections.singletonList(new NewTopic(SHARED_TOPIC, 1, (short) 1))) + .all() + .get(30, TimeUnit.SECONDS); + } + + producer = new KafkaProducer<>(producerProps()); + + consumer = new KafkaConsumer<>(consumerProps()); + + consumer.subscribe( + Collections.singletonList(SHARED_TOPIC), + new ConsumerRebalanceListener() { + @Override + public void onPartitionsRevoked(Collection collection) {} + + @Override + public void onPartitionsAssigned(Collection collection) { + consumerReady.countDown(); + } + }); + } + + public HashMap consumerProps() { + HashMap props = new HashMap<>(); + props.put("bootstrap.servers", kafka.getBootstrapServers()); + props.put("group.id", "test"); + props.put("enable.auto.commit", "true"); + props.put("auto.commit.interval.ms", 10); + props.put("session.timeout.ms", "30000"); + props.put("key.deserializer", IntegerDeserializer.class.getName()); + props.put("value.deserializer", StringDeserializer.class.getName()); + return props; + } + + public HashMap producerProps() { + HashMap props = new HashMap<>(); + props.put("bootstrap.servers", kafka.getBootstrapServers()); + props.put("retries", 0); + props.put("batch.size", "16384"); + props.put("linger.ms", 1); + props.put("buffer.memory", "33554432"); + props.put("key.serializer", IntegerSerializer.class.getName()); + props.put("value.serializer", StringSerializer.class.getName()); + return props; + } + + @AfterAll + void cleanupClass() { + if (producer != null) { + producer.close(); + } + if (consumer != null) { + consumer.close(); + } + kafka.stop(); + } + + @SuppressWarnings("PreferJavaTimeOverload") + public void awaitUntilConsumerIsReady() throws InterruptedException { + if (consumerReady.await(0, TimeUnit.SECONDS)) { + return; + } + for (int i = 0; i < 10; i++) { + consumer.poll(0); + if (consumerReady.await(1, TimeUnit.SECONDS)) { + break; + } + } + if (consumerReady.getCount() != 0) { + throw new AssertionError("Consumer wasn't assigned any partitions!"); + } + consumer.seekToBeginning(Collections.emptyList()); + } +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaClientPropagationBaseTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaClientPropagationBaseTest.java new file mode 100644 index 0000000000..3140a38840 --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaClientPropagationBaseTest.java @@ -0,0 +1,35 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafka.internal; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Duration; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.jupiter.api.Test; + +@SuppressWarnings("OtelInternalJavadoc") +public abstract class KafkaClientPropagationBaseTest extends KafkaClientBaseTest { + private static final boolean producerPropagationEnabled = + Boolean.parseBoolean( + System.getProperty("otel.instrumentation.kafka.producer-propagation.enabled", "true")); + + @Test + void testClientHeaderPropagationManualConfig() throws InterruptedException { + String message = "Testing without headers"; + producer.send(new ProducerRecord<>(SHARED_TOPIC, message)); + + awaitUntilConsumerIsReady(); + // check that the message was received + ConsumerRecords records = consumer.poll(Duration.ofSeconds(5).toMillis()); + assertThat(records.count()).isEqualTo(1); + for (ConsumerRecord record : records) { + assertThat(record.headers().iterator().hasNext()).isEqualTo(producerPropagationEnabled); + } + } +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/groovy/io/opentelemetry/instrumentation/kafkaclients/ExceptionHandlingTest.groovy b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/groovy/io/opentelemetry/instrumentation/kafkaclients/ExceptionHandlingTest.groovy deleted file mode 100644 index d0fd33a559..0000000000 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/groovy/io/opentelemetry/instrumentation/kafkaclients/ExceptionHandlingTest.groovy +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.instrumentation.kafkaclients - -import io.opentelemetry.instrumentation.test.InstrumentationSpecification -import io.opentelemetry.instrumentation.test.LibraryTestTrait -import java.lang.reflect.Proxy -import java.time.Duration -import org.apache.kafka.clients.consumer.Consumer -import org.apache.kafka.clients.producer.Producer - -class ExceptionHandlingTest extends InstrumentationSpecification implements LibraryTestTrait { - - def "test consumer exception propagates to caller"() throws Exception { - setup: - def consumer = Proxy.newProxyInstance(ExceptionHandlingTest.getClassLoader(), new Class[] { Consumer }) { proxy, method, args -> - throw new IllegalStateException("can't invoke") - } as Consumer - KafkaTelemetry telemetry = KafkaTelemetry.builder(getOpenTelemetry()) - .build() - def wrappedConsumer = telemetry.wrap(consumer) - - when: - wrappedConsumer.poll(Duration.ofMillis(1)) - then: - thrown IllegalStateException - } - - def "test producer exception propagates to caller"() throws Exception { - setup: - def producer = Proxy.newProxyInstance(ExceptionHandlingTest.getClassLoader(), new Class[] { Producer }) { proxy, method, args -> - throw new IllegalStateException("can't invoke") - } as Producer - KafkaTelemetry telemetry = KafkaTelemetry.builder(getOpenTelemetry()) - .build() - def wrappedProducer = telemetry.wrap(producer) - - when: - wrappedProducer.flush() - then: - thrown IllegalStateException - } -} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/groovy/io/opentelemetry/instrumentation/kafkaclients/InterceptorsTest.groovy b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/groovy/io/opentelemetry/instrumentation/kafkaclients/InterceptorsTest.groovy deleted file mode 100644 index d1a7a386bd..0000000000 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/groovy/io/opentelemetry/instrumentation/kafkaclients/InterceptorsTest.groovy +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.instrumentation.kafkaclients - -import io.opentelemetry.instrumentation.test.LibraryTestTrait -import io.opentelemetry.semconv.trace.attributes.SemanticAttributes -import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.clients.producer.ProducerConfig -import org.apache.kafka.clients.producer.ProducerRecord -import spock.lang.Unroll - -import java.time.Duration - -import static io.opentelemetry.api.trace.SpanKind.CONSUMER -import static io.opentelemetry.api.trace.SpanKind.INTERNAL -import static io.opentelemetry.api.trace.SpanKind.PRODUCER - -class InterceptorsTest extends KafkaClientBaseTest implements LibraryTestTrait { - - @Override - Map producerProps() { - def props = super.producerProps() - props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.getName()) - return props - } - - @Override - Map consumerProps() { - def props = super.consumerProps() - props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.getName()) - return props - } - - @Unroll - def "test interceptors"() throws Exception { - when: - String greeting = "Hello Kafka!" - runWithSpan("parent") { - producer.send(new ProducerRecord(SHARED_TOPIC, greeting)) { meta, ex -> - if (ex == null) { - runWithSpan("producer callback") {} - } else { - runWithSpan("producer exception: " + ex) {} - } - } - } - - then: - awaitUntilConsumerIsReady() - // check that the message was received - def records = consumer.poll(Duration.ofSeconds(5).toMillis()) - records.count() == 1 - for (record in records) { - assert record.value() == greeting - assert record.key() == null - } - - assertTraces(2) { - traces.sort(orderByRootSpanKind(INTERNAL, PRODUCER, CONSUMER)) - trace(0, 3) { - span(0) { - name "parent" - kind INTERNAL - hasNoParent() - } - span(1) { - name SHARED_TOPIC + " send" - kind PRODUCER - childOf span(0) - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" - "$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC - "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" - } - } - span(2) { - name SHARED_TOPIC + " receive" - kind CONSUMER - childOf span(1) - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" - "$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC - "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" - "$SemanticAttributes.MESSAGING_OPERATION" "receive" - "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long - "$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 } - "messaging.kafka.message.offset" { it >= 0 } - } - } - } - trace(1, 1) { - span(0) { - name "producer callback" - kind INTERNAL - hasNoParent() - } - } - } - } -} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/groovy/io/opentelemetry/instrumentation/kafkaclients/WrappersTest.groovy b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/groovy/io/opentelemetry/instrumentation/kafkaclients/WrappersTest.groovy deleted file mode 100644 index 929b6a9dac..0000000000 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/groovy/io/opentelemetry/instrumentation/kafkaclients/WrappersTest.groovy +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.instrumentation.kafkaclients - -import io.opentelemetry.instrumentation.test.LibraryTestTrait -import io.opentelemetry.semconv.trace.attributes.SemanticAttributes -import java.nio.charset.StandardCharsets -import org.apache.kafka.clients.producer.ProducerRecord -import spock.lang.Unroll - -import java.time.Duration - -import static io.opentelemetry.api.trace.SpanKind.CONSUMER -import static io.opentelemetry.api.trace.SpanKind.INTERNAL -import static io.opentelemetry.api.trace.SpanKind.PRODUCER -import static java.util.Collections.singletonList - -class WrappersTest extends KafkaClientBaseTest implements LibraryTestTrait { - - @Unroll - def "test wrappers, test headers: #testHeaders"() throws Exception { - KafkaTelemetry telemetry = KafkaTelemetry.builder(getOpenTelemetry()) - .setCapturedHeaders(singletonList("test-message-header")) - // TODO run tests both with and without experimental span attributes - .setCaptureExperimentalSpanAttributes(true) - .build() - - when: - String greeting = "Hello Kafka!" - def wrappedProducer = telemetry.wrap(producer) - runWithSpan("parent") { - def producerRecord = new ProducerRecord(SHARED_TOPIC, greeting) - if (testHeaders) { - producerRecord.headers().add("test-message-header", "test".getBytes(StandardCharsets.UTF_8)) - } - wrappedProducer.send(producerRecord) { meta, ex -> - if (ex == null) { - runWithSpan("producer callback") {} - } else { - runWithSpan("producer exception: " + ex) {} - } - } - } - - then: - awaitUntilConsumerIsReady() - // check that the message was received - def wrappedConsumer = telemetry.wrap(consumer) - def records = wrappedConsumer.poll(Duration.ofSeconds(5).toMillis()) - records.count() == 1 - for (record in records) { - assert record.value() == greeting - assert record.key() == null - } - - assertTraces(1) { - traces.sort(orderByRootSpanKind(INTERNAL, PRODUCER, CONSUMER)) - trace(0, 4) { - span(0) { - name "parent" - kind INTERNAL - hasNoParent() - } - span(1) { - name SHARED_TOPIC + " send" - kind PRODUCER - childOf span(0) - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" - "$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC - "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" - "$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 } - "messaging.kafka.message.offset" { it >= 0 } - if (testHeaders) { - "messaging.header.test_message_header" { it == ["test"] } - } - } - } - span(2) { - name SHARED_TOPIC + " receive" - kind CONSUMER - childOf span(1) - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" - "$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC - "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" - "$SemanticAttributes.MESSAGING_OPERATION" "receive" - "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long - "$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 } - "messaging.kafka.message.offset" { it >= 0 } - "kafka.record.queue_time_ms" { it >= 0 } - if (testHeaders) { - "messaging.header.test_message_header" { it == ["test"] } - } - } - } - span(3) { - name "producer callback" - kind INTERNAL - childOf span(0) - } - } - } - - where: - testHeaders << [false, true] - } -} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafka/internal/ExceptionHandlingTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafka/internal/ExceptionHandlingTest.java new file mode 100644 index 0000000000..23947aa1b9 --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafka/internal/ExceptionHandlingTest.java @@ -0,0 +1,61 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafka.internal; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import io.opentelemetry.instrumentation.kafkaclients.KafkaTelemetry; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; +import java.lang.reflect.Proxy; +import java.time.Duration; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.Producer; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +class ExceptionHandlingTest { + + @RegisterExtension + static final InstrumentationExtension testing = LibraryInstrumentationExtension.create(); + + @Test + void testConsumerExceptionPropagatesToCaller() { + Consumer consumer = + (Consumer) + Proxy.newProxyInstance( + ExceptionHandlingTest.class.getClassLoader(), + new Class[] {Consumer.class}, + (proxy, method, args) -> { + throw new IllegalStateException("can't invoke"); + }); + + KafkaTelemetry telemetry = KafkaTelemetry.builder(testing.getOpenTelemetry()).build(); + Consumer wrappedConsumer = telemetry.wrap(consumer); + + assertThatThrownBy(() -> wrappedConsumer.poll(Duration.ofMillis(1))) + .isInstanceOf(IllegalStateException.class) + .hasMessage("can't invoke"); + } + + @Test + void testProducerExceptionPropagatesToCaller() { + Producer producer = + (Producer) + Proxy.newProxyInstance( + ExceptionHandlingTest.class.getClassLoader(), + new Class[] {Producer.class}, + (proxy, method, args) -> { + throw new IllegalStateException("can't invoke"); + }); + + KafkaTelemetry telemetry = KafkaTelemetry.builder(testing.getOpenTelemetry()).build(); + Producer wrappedProducer = telemetry.wrap(producer); + assertThatThrownBy(wrappedProducer::flush) + .isInstanceOf(IllegalStateException.class) + .hasMessage("can't invoke"); + } +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafka/internal/InterceptorsTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafka/internal/InterceptorsTest.java new file mode 100644 index 0000000000..b8e2d91cc9 --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafka/internal/InterceptorsTest.java @@ -0,0 +1,119 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafka.internal; + +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static org.assertj.core.api.Assertions.assertThat; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.instrumentation.kafkaclients.TracingConsumerInterceptor; +import io.opentelemetry.instrumentation.kafkaclients.TracingProducerInterceptor; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.HashMap; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.assertj.core.api.AbstractLongAssert; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +class InterceptorsTest extends KafkaClientBaseTest { + @RegisterExtension + static final InstrumentationExtension testing = LibraryInstrumentationExtension.create(); + + @Override + public HashMap producerProps() { + HashMap props = super.producerProps(); + props.put( + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()); + return props; + } + + @Override + public HashMap consumerProps() { + HashMap props = super.consumerProps(); + props.put( + ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName()); + return props; + } + + @Test + void testInterceptors() throws InterruptedException { + String greeting = "Hello Kafka!"; + testing.runWithSpan( + "parent", + () -> { + producer.send( + new ProducerRecord<>(SHARED_TOPIC, greeting), + (meta, ex) -> { + if (ex == null) { + testing.runWithSpan("producer callback", () -> {}); + } else { + testing.runWithSpan("producer exception: " + ex, () -> {}); + } + }); + }); + + awaitUntilConsumerIsReady(); + // check that the message was received + ConsumerRecords records = consumer.poll(Duration.ofSeconds(5)); + assertThat(records.count()).isEqualTo(1); + for (ConsumerRecord record : records) { + assertThat(record.value()).isEqualTo(greeting); + assertThat(record.key()).isNull(); + } + + testing.waitAndAssertTraces( + trace -> { + trace.hasSpansSatisfyingExactly( + span -> { + span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(); + }, + span -> { + span.hasName(SHARED_TOPIC + " send") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfying( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic")); + }, + span -> { + span.hasName(SHARED_TOPIC + " receive") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfying( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"), + equalTo( + SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, + greeting.getBytes(StandardCharsets.UTF_8).length), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_PARTITION, + AbstractLongAssert::isNotNegative), + satisfies( + AttributeKey.longKey("messaging.kafka.message.offset"), + AbstractLongAssert::isNotNegative)); + }); + }, + trace -> { + trace.hasSpansSatisfyingExactly( + span -> { + span.hasName("producer callback").hasKind(SpanKind.INTERNAL).hasNoParent(); + }); + }); + } +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafka/internal/WrapperTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafka/internal/WrapperTest.java new file mode 100644 index 0000000000..0e074505e0 --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafka/internal/WrapperTest.java @@ -0,0 +1,136 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafka.internal; + +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static java.util.Collections.singletonList; +import static org.assertj.core.api.Assertions.assertThat; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.instrumentation.kafkaclients.KafkaTelemetry; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Collections; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.assertj.core.api.AbstractLongAssert; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +class WrapperTest extends KafkaClientBaseTest { + + @RegisterExtension + static final InstrumentationExtension testing = LibraryInstrumentationExtension.create(); + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testWrappers(boolean testHeaders) throws InterruptedException { + KafkaTelemetry telemetry = + KafkaTelemetry.builder(testing.getOpenTelemetry()) + .setCapturedHeaders(singletonList("test-message-header")) + // TODO run tests both with and without experimental span attributes + .setCaptureExperimentalSpanAttributes(true) + .build(); + + String greeting = "Hello Kafka!"; + Producer wrappedProducer = telemetry.wrap(producer); + + testing.runWithSpan( + "parent", + () -> { + ProducerRecord producerRecord = + new ProducerRecord<>(SHARED_TOPIC, greeting); + if (testHeaders) { + producerRecord + .headers() + .add("test-message-header", "test".getBytes(StandardCharsets.UTF_8)); + } + wrappedProducer.send( + producerRecord, + (meta, ex) -> { + if (ex == null) { + testing.runWithSpan("producer callback", () -> {}); + } else { + testing.runWithSpan("producer exception: " + ex, () -> {}); + } + }); + }); + + awaitUntilConsumerIsReady(); + Consumer wrappedConsumer = telemetry.wrap(consumer); + ConsumerRecords records = wrappedConsumer.poll(Duration.ofSeconds(10)); + assertThat(records.count()).isEqualTo(1); + for (ConsumerRecord record : records) { + assertThat(record.value()).isEqualTo(greeting); + assertThat(record.key()).isNull(); + } + + testing.waitAndAssertTraces( + trace -> { + trace.hasSpansSatisfyingExactly( + span -> { + span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(); + }, + span -> { + span.hasName(SHARED_TOPIC + " send") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfying( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_PARTITION, + AbstractLongAssert::isNotNegative)); + if (testHeaders) { + span.hasAttributesSatisfying( + equalTo( + AttributeKey.stringArrayKey("messaging.header.test_message_header"), + Collections.singletonList("test"))); + } + }, + span -> { + span.hasName(SHARED_TOPIC + " receive") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfying( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"), + equalTo( + SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, + greeting.getBytes(StandardCharsets.UTF_8).length), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_PARTITION, + AbstractLongAssert::isNotNegative), + satisfies( + AttributeKey.longKey("messaging.kafka.message.offset"), + AbstractLongAssert::isNotNegative)); + if (testHeaders) { + span.hasAttributesSatisfying( + equalTo( + AttributeKey.stringArrayKey("messaging.header.test_message_header"), + Collections.singletonList("test"))); + } + }, + span -> { + span.hasName("producer callback") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(0)); + }); + }); + } +}