Convert kafka-clients unit test from groovy to java (#7770)

Co-authored-by: Mateusz Rzeszutek <mrzeszutek@splunk.com>
This commit is contained in:
Will Li 2023-02-13 17:58:55 +08:00 committed by GitHub
parent 39e7ed4d6a
commit d3326db0cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 1179 additions and 979 deletions

View File

@ -1,277 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.kafkaclients
import io.opentelemetry.sdk.trace.data.SpanData
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import java.nio.charset.StandardCharsets
import org.apache.kafka.clients.producer.ProducerRecord
import java.time.Duration
import java.util.concurrent.TimeUnit
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
class KafkaClientDefaultTest extends KafkaClientPropagationBaseTest {
def "test kafka produce and consume, test headers: #testHeaders"() {
when:
String greeting = "Hello Kafka!"
runWithSpan("parent") {
def producerRecord = new ProducerRecord(SHARED_TOPIC, greeting)
if (testHeaders) {
producerRecord.headers().add("test-message-header", "test".getBytes(StandardCharsets.UTF_8))
}
producer.send(producerRecord) { meta, ex ->
if (ex == null) {
runWithSpan("producer callback") {}
} else {
runWithSpan("producer exception: " + ex) {}
}
}.get(5, TimeUnit.SECONDS)
}
then:
awaitUntilConsumerIsReady()
def records = consumer.poll(Duration.ofSeconds(5).toMillis())
records.count() == 1
// iterate over records to generate spans
for (record in records) {
runWithSpan("processing") {
assert record.value() == greeting
assert record.key() == null
}
}
assertTraces(2) {
traces.sort(orderByRootSpanKind(INTERNAL, CONSUMER))
SpanData producerSpan
trace(0, 3) {
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
span(1) {
name SHARED_TOPIC + " send"
kind PRODUCER
childOf span(0)
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 }
"messaging.kafka.message.offset" { it >= 0 }
if (testHeaders) {
"messaging.header.test_message_header" { it == ["test"] }
}
}
}
span(2) {
name "producer callback"
kind INTERNAL
childOf span(0)
}
producerSpan = span(1)
}
trace(1, 3) {
span(0) {
name SHARED_TOPIC + " receive"
kind CONSUMER
hasNoParent()
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_OPERATION" "receive"
if (testHeaders) {
"messaging.header.test_message_header" { it == ["test"] }
}
}
}
span(1) {
name SHARED_TOPIC + " process"
kind CONSUMER
childOf span(0)
hasLink producerSpan
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 }
"messaging.kafka.message.offset" { it >= 0 }
"kafka.record.queue_time_ms" { it >= 0 }
if (testHeaders) {
"messaging.header.test_message_header" { it == ["test"] }
}
}
}
span(2) {
name "processing"
childOf span(1)
}
}
}
where:
testHeaders << [false, true]
}
def "test pass through tombstone"() {
when:
producer.send(new ProducerRecord<>(SHARED_TOPIC, null)).get(5, TimeUnit.SECONDS)
then:
awaitUntilConsumerIsReady()
def records = consumer.poll(Duration.ofSeconds(5).toMillis())
records.count() == 1
// iterate over records to generate spans
for (record in records) {
assert record.value() == null
assert record.key() == null
}
assertTraces(2) {
traces.sort(orderByRootSpanKind(PRODUCER, CONSUMER))
SpanData producerSpan
trace(0, 1) {
span(0) {
name SHARED_TOPIC + " send"
kind PRODUCER
hasNoParent()
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE" true
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 }
"messaging.kafka.message.offset" { it >= 0 }
}
}
producerSpan = span(0)
}
trace(1, 2) {
span(0) {
name SHARED_TOPIC + " receive"
kind CONSUMER
hasNoParent()
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_OPERATION" "receive"
}
}
span(1) {
name SHARED_TOPIC + " process"
kind CONSUMER
childOf span(0)
hasLink producerSpan
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
"$SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE" true
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 }
"messaging.kafka.message.offset" { it >= 0 }
"kafka.record.queue_time_ms" { it >= 0 }
}
}
}
}
}
def "test records(TopicPartition) kafka consume"() {
setup:
def partition = 0
when: "send message"
def greeting = "Hello from MockConsumer!"
producer.send(new ProducerRecord<>(SHARED_TOPIC, partition, null, greeting)).get(5, TimeUnit.SECONDS)
then: "wait for PRODUCER span"
waitForTraces(1)
when: "receive messages"
awaitUntilConsumerIsReady()
def consumerRecords = consumer.poll(Duration.ofSeconds(5).toMillis())
def recordsInPartition = consumerRecords.records(KafkaClientBaseTest.topicPartition)
recordsInPartition.size() == 1
// iterate over records to generate spans
for (record in recordsInPartition) {
assert record.value() == greeting
assert record.key() == null
}
then:
assertTraces(2) {
traces.sort(orderByRootSpanKind(PRODUCER, CONSUMER))
SpanData producerSpan
trace(0, 1) {
span(0) {
name SHARED_TOPIC + " send"
kind PRODUCER
hasNoParent()
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" partition
"messaging.kafka.message.offset" { it >= 0 }
}
}
producerSpan = span(0)
}
trace(1, 2) {
span(0) {
name SHARED_TOPIC + " receive"
kind CONSUMER
hasNoParent()
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_OPERATION" "receive"
}
}
span(1) {
name SHARED_TOPIC + " process"
kind CONSUMER
childOf span(0)
hasLink producerSpan
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" partition
"messaging.kafka.message.offset" { it >= 0 }
"kafka.record.queue_time_ms" { it >= 0 }
}
}
}
}
}
}

View File

@ -1,90 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.kafkaclients
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import org.apache.kafka.clients.producer.ProducerRecord
import java.time.Duration
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
class KafkaClientPropagationDisabledTest extends KafkaClientPropagationBaseTest {
def "should not read remote context when consuming messages if propagation is disabled"() {
when: "send message"
String message = "Testing without headers"
producer.send(new ProducerRecord<>(SHARED_TOPIC, message))
then: "producer span is created"
assertTraces(1) {
trace(0, 1) {
span(0) {
name SHARED_TOPIC + " send"
kind PRODUCER
hasNoParent()
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 }
"messaging.kafka.message.offset" { it >= 0 }
}
}
}
}
when: "read message without context propagation"
awaitUntilConsumerIsReady()
def records = consumer.poll(Duration.ofSeconds(5).toMillis())
records.count() == 1
// iterate over records to generate spans
for (record in records) {
runWithSpan("processing") {}
}
then:
assertTraces(2) {
trace(0, 1) {
span(0) {
name SHARED_TOPIC + " send"
kind PRODUCER
hasNoParent()
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 }
"messaging.kafka.message.offset" { it >= 0 }
}
}
}
trace(1, 2) {
span(0) {
name SHARED_TOPIC + " process"
kind CONSUMER
hasNoLinks()
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 }
"messaging.kafka.message.offset" { it >= 0 }
"kafka.record.queue_time_ms" { it >= 0 }
}
span(1) {
name "processing"
childOf span(0)
}
}
}
}
}
}

View File

@ -1,192 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.kafkaclients
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import java.time.Duration
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
class KafkaClientSuppressReceiveSpansTest extends KafkaClientPropagationBaseTest {
def "test kafka produce and consume"() {
when:
String greeting = "Hello Kafka!"
runWithSpan("parent") {
producer.send(new ProducerRecord(SHARED_TOPIC, greeting)) { meta, ex ->
if (ex == null) {
runWithSpan("producer callback") {}
} else {
runWithSpan("producer exception: " + ex) {}
}
}
}
then:
awaitUntilConsumerIsReady()
// check that the message was received
def records = consumer.poll(Duration.ofSeconds(5).toMillis())
for (record in records) {
runWithSpan("processing") {
assert record.value() == greeting
assert record.key() == null
}
}
assertTraces(1) {
trace(0, 5) {
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
span(1) {
name SHARED_TOPIC + " send"
kind PRODUCER
childOf span(0)
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 }
"messaging.kafka.message.offset" { it >= 0 }
}
}
span(2) {
name SHARED_TOPIC + " process"
kind CONSUMER
childOf span(1)
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 }
"messaging.kafka.message.offset" { it >= 0 }
"kafka.record.queue_time_ms" { it >= 0 }
}
}
span(3) {
name "processing"
childOf span(2)
}
span(4) {
name "producer callback"
kind INTERNAL
childOf span(0)
}
}
}
}
def "test pass through tombstone"() {
when:
producer.send(new ProducerRecord<>(SHARED_TOPIC, null))
then:
awaitUntilConsumerIsReady()
// check that the message was received
def records = consumer.poll(Duration.ofSeconds(5).toMillis())
for (record in records) {
assert record.value() == null
assert record.key() == null
}
assertTraces(1) {
trace(0, 2) {
span(0) {
name SHARED_TOPIC + " send"
kind PRODUCER
hasNoParent()
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE" true
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 }
"messaging.kafka.message.offset" { it >= 0 }
}
}
span(1) {
name SHARED_TOPIC + " process"
kind CONSUMER
childOf span(0)
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
"$SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE" true
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 }
"messaging.kafka.message.offset" { it >= 0 }
"kafka.record.queue_time_ms" { it >= 0 }
}
}
}
}
}
def "test records(TopicPartition) kafka consume"() {
setup:
def partition = 0
when: "send message"
def greeting = "Hello from MockConsumer!"
producer.send(new ProducerRecord<>(SHARED_TOPIC, partition, null, greeting))
then: "wait for PRODUCER span"
waitForTraces(1)
awaitUntilConsumerIsReady()
when: "receive messages"
def consumerRecords = consumer.poll(Duration.ofSeconds(5).toMillis())
def recordsInPartition = consumerRecords.records(new TopicPartition(SHARED_TOPIC, partition))
for (record in recordsInPartition) {
assert record.value() == greeting
assert record.key() == null
}
then:
assertTraces(1) {
trace(0, 2) {
span(0) {
name SHARED_TOPIC + " send"
kind PRODUCER
hasNoParent()
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" partition
"messaging.kafka.message.offset" { it >= 0 }
}
}
span(1) {
name SHARED_TOPIC + " process"
kind CONSUMER
childOf span(0)
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" partition
"messaging.kafka.message.offset" { it >= 0 }
"kafka.record.queue_time_ms" { it >= 0 }
}
}
}
}
}
}

View File

@ -0,0 +1,329 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.kafkaclients;
import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.kafka.internal.KafkaClientBaseTest;
import io.opentelemetry.instrumentation.kafka.internal.KafkaClientPropagationBaseTest;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.sdk.trace.data.LinkData;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.assertj.core.api.AbstractLongAssert;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
class KafkaClientDefaultTest extends KafkaClientPropagationBaseTest {
@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
@DisplayName("test kafka produce and consume")
@ParameterizedTest(name = "{index} => test headers: {0}")
@ValueSource(booleans = {true, false})
void testKafkaProducerAndConsumerSpan(boolean testHeaders) throws Exception {
String greeting = "Hello Kafka!";
testing.runWithSpan(
"parent",
() -> {
ProducerRecord<Integer, String> producerRecord =
new ProducerRecord<>(SHARED_TOPIC, greeting);
if (testHeaders) {
producerRecord
.headers()
.add("test-message-header", "test".getBytes(StandardCharsets.UTF_8));
}
producer
.send(
producerRecord,
(meta, ex) -> {
if (ex == null) {
testing.runWithSpan("producer callback", () -> {});
} else {
testing.runWithSpan("producer exception: " + ex, () -> {});
}
})
.get(5, TimeUnit.SECONDS);
});
awaitUntilConsumerIsReady();
@SuppressWarnings("PreferJavaTimeOverload")
ConsumerRecords<?, ?> records = consumer.poll(Duration.ofSeconds(5).toMillis());
assertThat(records.count()).isEqualTo(1);
// iterate over records to generate spans
for (ConsumerRecord<?, ?> record : records) {
testing.runWithSpan(
"processing",
() -> {
assertThat(record.value()).isEqualTo(greeting);
assertThat(record.key()).isNull();
});
}
AtomicReference<SpanData> producerSpan = new AtomicReference<>();
testing.waitAndAssertSortedTraces(
orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER),
trace -> {
trace.hasSpansSatisfyingExactly(
span -> {
span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent();
},
span -> {
span.hasName(SHARED_TOPIC + " send")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfying(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
satisfies(
SemanticAttributes.MESSAGING_KAFKA_PARTITION,
AbstractLongAssert::isNotNegative),
satisfies(
AttributeKey.longKey("messaging.kafka.message.offset"),
AbstractLongAssert::isNotNegative));
if (testHeaders) {
span.hasAttributesSatisfying(
equalTo(
AttributeKey.stringArrayKey("messaging.header.test_message_header"),
Collections.singletonList("test")));
}
},
span -> {
span.hasName("producer callback")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0));
});
producerSpan.set(trace.getSpan(1));
},
trace ->
trace.hasSpansSatisfyingExactly(
span -> {
span.hasName(SHARED_TOPIC + " receive")
.hasKind(SpanKind.CONSUMER)
.hasNoParent()
.hasAttributesSatisfying(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"));
if (testHeaders) {
span.hasAttributesSatisfying(
equalTo(
AttributeKey.stringArrayKey("messaging.header.test_message_header"),
Collections.singletonList("test")));
}
},
span -> {
span.hasName(SHARED_TOPIC + " process")
.hasKind(SpanKind.CONSUMER)
.hasLinks(LinkData.create(producerSpan.get().getSpanContext()))
.hasParent(trace.getSpan(0))
.hasAttributesSatisfying(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
equalTo(
SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES,
greeting.getBytes(StandardCharsets.UTF_8).length),
satisfies(
SemanticAttributes.MESSAGING_KAFKA_PARTITION,
AbstractLongAssert::isNotNegative),
satisfies(
AttributeKey.longKey("messaging.kafka.message.offset"),
AbstractLongAssert::isNotNegative),
satisfies(
AttributeKey.longKey("kafka.record.queue_time_ms"),
AbstractLongAssert::isNotNegative));
if (testHeaders) {
span.hasAttributesSatisfying(
equalTo(
AttributeKey.stringArrayKey("messaging.header.test_message_header"),
Collections.singletonList("test")));
}
},
span -> span.hasName("processing").hasParent(trace.getSpan(1))));
}
@DisplayName("test pass through tombstone")
@Test
void testPassThroughTombstone()
throws ExecutionException, InterruptedException, TimeoutException {
producer.send(new ProducerRecord<>(SHARED_TOPIC, null)).get(5, TimeUnit.SECONDS);
awaitUntilConsumerIsReady();
@SuppressWarnings("PreferJavaTimeOverload")
ConsumerRecords<?, ?> records = consumer.poll(Duration.ofSeconds(5).toMillis());
assertThat(records.count()).isEqualTo(1);
// iterate over records to generate spans
for (ConsumerRecord<?, ?> record : records) {
assertThat(record.value()).isNull();
assertThat(record.key()).isNull();
}
AtomicReference<SpanData> producerSpan = new AtomicReference<>();
testing.waitAndAssertSortedTraces(
orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER),
trace -> {
trace.hasSpansSatisfyingExactly(
span -> {
span.hasName(SHARED_TOPIC + " send")
.hasKind(SpanKind.PRODUCER)
.hasNoParent()
.hasAttributesSatisfying(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC),
equalTo(SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE, true),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
satisfies(
SemanticAttributes.MESSAGING_KAFKA_PARTITION,
AbstractLongAssert::isNotNegative),
satisfies(
AttributeKey.longKey("messaging.kafka.message.offset"),
AbstractLongAssert::isNotNegative));
});
producerSpan.set(trace.getSpan(0));
},
trace -> {
trace.hasSpansSatisfyingExactly(
span -> {
span.hasName(SHARED_TOPIC + " receive")
.hasKind(SpanKind.CONSUMER)
.hasNoParent()
.hasAttributesSatisfying(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"));
},
span -> {
span.hasName(SHARED_TOPIC + " process")
.hasKind(SpanKind.CONSUMER)
.hasLinks(LinkData.create(producerSpan.get().getSpanContext()))
.hasParent(trace.getSpan(0))
.hasAttributesSatisfying(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
equalTo(SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE, true),
equalTo(SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, -1L),
satisfies(
SemanticAttributes.MESSAGING_KAFKA_PARTITION,
AbstractLongAssert::isNotNegative),
satisfies(
AttributeKey.longKey("messaging.kafka.message.offset"),
AbstractLongAssert::isNotNegative),
satisfies(
AttributeKey.longKey("kafka.record.queue_time_ms"),
AbstractLongAssert::isNotNegative));
});
});
}
@DisplayName("test records(TopicPartition) kafka consume")
@Test
void testRecordsWithTopicPartitionKafkaConsume()
throws ExecutionException, InterruptedException, TimeoutException {
String greeting = "Hello from MockConsumer!";
producer
.send(new ProducerRecord<>(SHARED_TOPIC, partition, null, greeting))
.get(5, TimeUnit.SECONDS);
testing.waitForTraces(1);
awaitUntilConsumerIsReady();
@SuppressWarnings("PreferJavaTimeOverload")
ConsumerRecords<?, ?> consumerRecords = consumer.poll(Duration.ofSeconds(5).toMillis());
List<? extends ConsumerRecord<?, ?>> recordsInPartition =
consumerRecords.records(KafkaClientBaseTest.topicPartition);
assertThat(recordsInPartition.size()).isEqualTo(1);
// iterate over records to generate spans
for (ConsumerRecord<?, ?> record : recordsInPartition) {
assertThat(record.value()).isEqualTo(greeting);
assertThat(record.key()).isNull();
}
AtomicReference<SpanData> producerSpan = new AtomicReference<>();
testing.waitAndAssertSortedTraces(
orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER),
trace -> {
trace.hasSpansSatisfyingExactly(
span -> {
span.hasName(SHARED_TOPIC + " send")
.hasKind(SpanKind.PRODUCER)
.hasNoParent()
.hasAttributesSatisfying(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
equalTo(SemanticAttributes.MESSAGING_KAFKA_PARTITION, partition),
satisfies(
AttributeKey.longKey("messaging.kafka.message.offset"),
AbstractLongAssert::isNotNegative));
});
producerSpan.set(trace.getSpan(0));
},
trace -> {
trace.hasSpansSatisfyingExactly(
span -> {
span.hasName(SHARED_TOPIC + " receive")
.hasKind(SpanKind.CONSUMER)
.hasNoParent()
.hasAttributesSatisfying(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"));
},
span -> {
span.hasName(SHARED_TOPIC + " process")
.hasKind(SpanKind.CONSUMER)
.hasLinks(LinkData.create(producerSpan.get().getSpanContext()))
.hasParent(trace.getSpan(0))
.hasAttributesSatisfying(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
equalTo(
SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES,
greeting.getBytes(StandardCharsets.UTF_8).length),
equalTo(SemanticAttributes.MESSAGING_KAFKA_PARTITION, partition),
satisfies(
AttributeKey.longKey("messaging.kafka.message.offset"),
AbstractLongAssert::isNotNegative),
satisfies(
AttributeKey.longKey("kafka.record.queue_time_ms"),
AbstractLongAssert::isNotNegative));
});
});
}
}

View File

@ -0,0 +1,116 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.kafkaclients;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.kafka.internal.KafkaClientPropagationBaseTest;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.assertj.core.api.AbstractLongAssert;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
class KafkaClientPropagationDisabledTest extends KafkaClientPropagationBaseTest {
@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
@DisplayName("should not read remote context when consuming messages if propagation is disabled")
@Test
void testReadRemoteContextWhenPropagationIsDisabled() throws InterruptedException {
String message = "Testing without headers";
producer.send(new ProducerRecord<>(SHARED_TOPIC, message));
testing.waitAndAssertTraces(
trace -> {
trace.hasSpansSatisfyingExactly(
span -> {
span.hasName(SHARED_TOPIC + " send")
.hasKind(SpanKind.PRODUCER)
.hasNoParent()
.hasAttributesSatisfying(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
satisfies(
SemanticAttributes.MESSAGING_KAFKA_PARTITION,
AbstractLongAssert::isNotNegative),
satisfies(
AttributeKey.longKey("messaging.kafka.message.offset"),
AbstractLongAssert::isNotNegative));
});
});
awaitUntilConsumerIsReady();
@SuppressWarnings("PreferJavaTimeOverload")
ConsumerRecords<?, ?> records = consumer.poll(Duration.ofSeconds(5).toMillis());
assertThat(records.count()).isEqualTo(1);
// iterate over records to generate spans
for (ConsumerRecord<?, ?> ignored : records) {
testing.runWithSpan("processing", () -> {});
}
testing.waitAndAssertTraces(
trace -> {
trace.hasSpansSatisfyingExactly(
span -> {
span.hasName(SHARED_TOPIC + " send")
.hasKind(SpanKind.PRODUCER)
.hasNoParent()
.hasAttributesSatisfying(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
satisfies(
SemanticAttributes.MESSAGING_KAFKA_PARTITION,
AbstractLongAssert::isNotNegative),
satisfies(
AttributeKey.longKey("messaging.kafka.message.offset"),
AbstractLongAssert::isNotNegative));
});
},
trace -> {
trace.hasSpansSatisfyingExactly(
span -> {
span.hasName(SHARED_TOPIC + " process")
.hasKind(SpanKind.CONSUMER)
.hasLinks(Collections.emptyList())
.hasAttributesSatisfying(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
equalTo(
SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES,
message.getBytes(StandardCharsets.UTF_8).length),
equalTo(SemanticAttributes.MESSAGING_KAFKA_PARTITION, partition),
satisfies(
AttributeKey.longKey("messaging.kafka.message.offset"),
AbstractLongAssert::isNotNegative),
satisfies(
AttributeKey.longKey("kafka.record.queue_time_ms"),
AbstractLongAssert::isNotNegative));
},
span -> {
span.hasName("processing").hasParent(trace.getSpan(0));
});
});
}
}

View File

@ -0,0 +1,242 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.kafkaclients;
import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.kafka.internal.KafkaClientBaseTest;
import io.opentelemetry.instrumentation.kafka.internal.KafkaClientPropagationBaseTest;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.assertj.core.api.AbstractLongAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
class KafkaClientSuppressReceiveSpansTest extends KafkaClientPropagationBaseTest {
@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
@Test
void testKafkaProduceAndConsume() throws InterruptedException {
String greeting = "Hello Kafka!";
testing.runWithSpan(
"parent",
() -> {
producer.send(
new ProducerRecord<>(SHARED_TOPIC, greeting),
(meta, ex) -> {
if (ex == null) {
testing.runWithSpan("producer callback", () -> {});
} else {
testing.runWithSpan("producer exception: " + ex, () -> {});
}
});
});
awaitUntilConsumerIsReady();
// check that the message was received
@SuppressWarnings("PreferJavaTimeOverload")
ConsumerRecords<?, ?> records = consumer.poll(Duration.ofSeconds(5).toMillis());
for (ConsumerRecord<?, ?> record : records) {
testing.runWithSpan(
"processing",
() -> {
assertThat(record.value()).isEqualTo(greeting);
assertThat(record.key()).isNull();
});
}
testing.waitAndAssertTraces(
trace -> {
trace.hasSpansSatisfyingExactly(
span -> {
span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent();
},
span -> {
span.hasName(SHARED_TOPIC + " send")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfying(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
satisfies(
SemanticAttributes.MESSAGING_KAFKA_PARTITION,
AbstractLongAssert::isNotNegative),
satisfies(
AttributeKey.longKey("messaging.kafka.message.offset"),
AbstractLongAssert::isNotNegative));
},
span -> {
span.hasName(SHARED_TOPIC + " process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(1))
.hasAttributesSatisfying(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
equalTo(
SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES,
greeting.getBytes(StandardCharsets.UTF_8).length),
satisfies(
SemanticAttributes.MESSAGING_KAFKA_PARTITION,
AbstractLongAssert::isNotNegative),
satisfies(
AttributeKey.longKey("messaging.kafka.message.offset"),
AbstractLongAssert::isNotNegative),
satisfies(
AttributeKey.longKey("kafka.record.queue_time_ms"),
AbstractLongAssert::isNotNegative));
},
span -> {
span.hasName("processing").hasKind(SpanKind.INTERNAL).hasParent(trace.getSpan(2));
},
span -> {
span.hasName("producer callback")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0));
});
});
}
@Test
void testPassThroughTombstone()
throws ExecutionException, InterruptedException, TimeoutException {
producer.send(new ProducerRecord<>(SHARED_TOPIC, null)).get(5, TimeUnit.SECONDS);
awaitUntilConsumerIsReady();
@SuppressWarnings("PreferJavaTimeOverload")
ConsumerRecords<?, ?> records = consumer.poll(Duration.ofSeconds(5).toMillis());
assertThat(records.count()).isEqualTo(1);
// iterate over records to generate spans
for (ConsumerRecord<?, ?> record : records) {
assertThat(record.value()).isNull();
assertThat(record.key()).isNull();
}
testing.waitAndAssertSortedTraces(
orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER),
trace -> {
trace.hasSpansSatisfyingExactly(
span -> {
span.hasName(SHARED_TOPIC + " send")
.hasKind(SpanKind.PRODUCER)
.hasNoParent()
.hasAttributesSatisfying(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC),
equalTo(SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE, true),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
satisfies(
SemanticAttributes.MESSAGING_KAFKA_PARTITION,
AbstractLongAssert::isNotNegative),
satisfies(
AttributeKey.longKey("messaging.kafka.message.offset"),
AbstractLongAssert::isNotNegative));
},
span -> {
span.hasName(SHARED_TOPIC + " process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfying(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
equalTo(SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE, true),
equalTo(SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, -1L),
satisfies(
SemanticAttributes.MESSAGING_KAFKA_PARTITION,
AbstractLongAssert::isNotNegative),
satisfies(
AttributeKey.longKey("messaging.kafka.message.offset"),
AbstractLongAssert::isNotNegative),
satisfies(
AttributeKey.longKey("kafka.record.queue_time_ms"),
AbstractLongAssert::isNotNegative));
});
});
}
@Test
void testRecordsWithTopicPartitionKafkaConsume()
throws ExecutionException, InterruptedException, TimeoutException {
String greeting = "Hello from MockConsumer!";
producer
.send(new ProducerRecord<>(SHARED_TOPIC, partition, null, greeting))
.get(5, TimeUnit.SECONDS);
testing.waitForTraces(1);
awaitUntilConsumerIsReady();
@SuppressWarnings("PreferJavaTimeOverload")
ConsumerRecords<?, ?> consumerRecords = consumer.poll(Duration.ofSeconds(5).toMillis());
List<? extends ConsumerRecord<?, ?>> recordsInPartition =
consumerRecords.records(KafkaClientBaseTest.topicPartition);
assertThat(recordsInPartition.size()).isEqualTo(1);
// iterate over records to generate spans
for (ConsumerRecord<?, ?> record : recordsInPartition) {
assertThat(record.value()).isEqualTo(greeting);
assertThat(record.key()).isNull();
}
testing.waitAndAssertSortedTraces(
orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER),
trace -> {
trace.hasSpansSatisfyingExactly(
span -> {
span.hasName(SHARED_TOPIC + " send")
.hasKind(SpanKind.PRODUCER)
.hasNoParent()
.hasAttributesSatisfying(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
equalTo(SemanticAttributes.MESSAGING_KAFKA_PARTITION, partition),
satisfies(
AttributeKey.longKey("messaging.kafka.message.offset"),
AbstractLongAssert::isNotNegative));
},
span -> {
span.hasName(SHARED_TOPIC + " process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfying(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
equalTo(
SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES,
greeting.getBytes(StandardCharsets.UTF_8).length),
equalTo(SemanticAttributes.MESSAGING_KAFKA_PARTITION, partition),
satisfies(
AttributeKey.longKey("messaging.kafka.message.offset"),
AbstractLongAssert::isNotNegative),
satisfies(
AttributeKey.longKey("kafka.record.queue_time_ms"),
AbstractLongAssert::isNotNegative));
});
});
}
}

View File

@ -1,126 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.kafkaclients
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import org.testcontainers.utility.DockerImageName
import java.time.Duration
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.IntegerDeserializer
import org.apache.kafka.common.serialization.IntegerSerializer
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.testcontainers.containers.KafkaContainer
import org.testcontainers.containers.output.Slf4jLogConsumer
import org.testcontainers.containers.wait.strategy.Wait
import spock.lang.Shared
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
abstract class KafkaClientBaseTest extends InstrumentationSpecification {
private static final Logger logger = LoggerFactory.getLogger(KafkaClientBaseTest)
protected static final SHARED_TOPIC = "shared.topic"
@Shared
KafkaContainer kafka
@Shared
Producer<Integer, String> producer
@Shared
Consumer<Integer, String> consumer
@Shared
CountDownLatch consumerReady = new CountDownLatch(1)
static TopicPartition topicPartition = new TopicPartition(SHARED_TOPIC, 0)
def setupSpec() {
kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"))
.withLogConsumer(new Slf4jLogConsumer(logger))
.waitingFor(Wait.forLogMessage(".*started \\(kafka.server.KafkaServer\\).*", 1))
.withStartupTimeout(Duration.ofMinutes(1))
kafka.start()
// create test topic
AdminClient.create(["bootstrap.servers": kafka.bootstrapServers]).withCloseable { admin ->
admin.createTopics([new NewTopic(SHARED_TOPIC, 1, (short) 1)]).all().get(30, TimeUnit.SECONDS)
}
producer = new KafkaProducer<>(producerProps())
consumer = new KafkaConsumer<>(consumerProps())
consumer.subscribe([SHARED_TOPIC], new ConsumerRebalanceListener() {
@Override
void onPartitionsRevoked(Collection<TopicPartition> collection) {
}
@Override
void onPartitionsAssigned(Collection<TopicPartition> collection) {
consumerReady.countDown()
}
})
}
Map<String, ?> producerProps() {
// values copied from spring's KafkaTestUtils
return [
"bootstrap.servers": kafka.bootstrapServers,
"retries" : 0,
"batch.size" : "16384",
"linger.ms" : 1,
"buffer.memory" : "33554432",
"key.serializer" : IntegerSerializer,
"value.serializer" : StringSerializer
]
}
Map<String, ?> consumerProps() {
// values copied from spring's KafkaTestUtils
return [
"bootstrap.servers" : kafka.bootstrapServers,
"group.id" : "test",
"enable.auto.commit" : "true",
"auto.commit.interval.ms": "10",
"session.timeout.ms" : "30000",
"key.deserializer" : IntegerDeserializer,
"value.deserializer" : StringDeserializer
]
}
def cleanupSpec() {
consumer?.close()
producer?.close()
kafka.stop()
}
// Kafka's eventual consistency behavior forces us to do a couple of empty poll() calls until it gets properly assigned a topic partition
void awaitUntilConsumerIsReady() {
if (consumerReady.await(0, TimeUnit.SECONDS)) {
return
}
for (i in 0..<10) {
consumer.poll(0)
if (consumerReady.await(1, TimeUnit.SECONDS)) {
break
}
}
if (consumerReady.getCount() != 0) {
throw new AssertionError("Consumer wasn't assigned any partitions!")
}
consumer.seekToBeginning([])
}
}

View File

@ -1,34 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.kafkaclients
import io.opentelemetry.instrumentation.test.AgentTestTrait
import org.apache.kafka.clients.producer.ProducerRecord
import spock.lang.Unroll
import java.time.Duration
abstract class KafkaClientPropagationBaseTest extends KafkaClientBaseTest implements AgentTestTrait {
private static final boolean producerPropagationEnabled = Boolean.parseBoolean(
System.getProperty("otel.instrumentation.kafka.producer-propagation.enabled", "true"))
@Unroll
def "test kafka client header propagation manual config"() {
when:
String message = "Testing without headers"
producer.send(new ProducerRecord<>(SHARED_TOPIC, message))
then:
awaitUntilConsumerIsReady()
// check that the message was received
def records = consumer.poll(Duration.ofSeconds(5).toMillis())
records.count() == 1
for (record in records) {
assert record.headers().iterator().hasNext() == producerPropagationEnabled
}
}
}

View File

@ -0,0 +1,141 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.kafka.internal;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.DockerImageName;
@SuppressWarnings("OtelInternalJavadoc")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public abstract class KafkaClientBaseTest {
private static final Logger logger = LoggerFactory.getLogger(KafkaClientBaseTest.class);
protected static final String SHARED_TOPIC = "shared.topic";
private KafkaContainer kafka;
protected Producer<Integer, String> producer;
protected Consumer<Integer, String> consumer;
private final CountDownLatch consumerReady = new CountDownLatch(1);
public static final int partition = 0;
public static final TopicPartition topicPartition = new TopicPartition(SHARED_TOPIC, partition);
@BeforeAll
void setupClass() throws ExecutionException, InterruptedException, TimeoutException {
kafka =
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"))
.withLogConsumer(new Slf4jLogConsumer(logger))
.waitingFor(Wait.forLogMessage(".*started \\(kafka.server.KafkaServer\\).*", 1))
.withStartupTimeout(Duration.ofMinutes(1));
kafka.start();
// create test topic
HashMap<String, Object> adminProps = new HashMap<>();
adminProps.put("bootstrap.servers", kafka.getBootstrapServers());
try (AdminClient admin = AdminClient.create(adminProps)) {
admin
.createTopics(Collections.singletonList(new NewTopic(SHARED_TOPIC, 1, (short) 1)))
.all()
.get(30, TimeUnit.SECONDS);
}
producer = new KafkaProducer<>(producerProps());
consumer = new KafkaConsumer<>(consumerProps());
consumer.subscribe(
Collections.singletonList(SHARED_TOPIC),
new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
consumerReady.countDown();
}
});
}
public HashMap<String, Object> consumerProps() {
HashMap<String, Object> props = new HashMap<>();
props.put("bootstrap.servers", kafka.getBootstrapServers());
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", 10);
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", IntegerDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
return props;
}
public HashMap<String, Object> producerProps() {
HashMap<String, Object> props = new HashMap<>();
props.put("bootstrap.servers", kafka.getBootstrapServers());
props.put("retries", 0);
props.put("batch.size", "16384");
props.put("linger.ms", 1);
props.put("buffer.memory", "33554432");
props.put("key.serializer", IntegerSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
return props;
}
@AfterAll
void cleanupClass() {
if (producer != null) {
producer.close();
}
if (consumer != null) {
consumer.close();
}
kafka.stop();
}
@SuppressWarnings("PreferJavaTimeOverload")
public void awaitUntilConsumerIsReady() throws InterruptedException {
if (consumerReady.await(0, TimeUnit.SECONDS)) {
return;
}
for (int i = 0; i < 10; i++) {
consumer.poll(0);
if (consumerReady.await(1, TimeUnit.SECONDS)) {
break;
}
}
if (consumerReady.getCount() != 0) {
throw new AssertionError("Consumer wasn't assigned any partitions!");
}
consumer.seekToBeginning(Collections.emptyList());
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.kafka.internal;
import static org.assertj.core.api.Assertions.assertThat;
import java.time.Duration;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.Test;
@SuppressWarnings("OtelInternalJavadoc")
public abstract class KafkaClientPropagationBaseTest extends KafkaClientBaseTest {
private static final boolean producerPropagationEnabled =
Boolean.parseBoolean(
System.getProperty("otel.instrumentation.kafka.producer-propagation.enabled", "true"));
@Test
void testClientHeaderPropagationManualConfig() throws InterruptedException {
String message = "Testing without headers";
producer.send(new ProducerRecord<>(SHARED_TOPIC, message));
awaitUntilConsumerIsReady();
// check that the message was received
ConsumerRecords<?, ?> records = consumer.poll(Duration.ofSeconds(5).toMillis());
assertThat(records.count()).isEqualTo(1);
for (ConsumerRecord<?, ?> record : records) {
assertThat(record.headers().iterator().hasNext()).isEqualTo(producerPropagationEnabled);
}
}
}

View File

@ -1,46 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.kafkaclients
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import io.opentelemetry.instrumentation.test.LibraryTestTrait
import java.lang.reflect.Proxy
import java.time.Duration
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.producer.Producer
class ExceptionHandlingTest extends InstrumentationSpecification implements LibraryTestTrait {
def "test consumer exception propagates to caller"() throws Exception {
setup:
def consumer = Proxy.newProxyInstance(ExceptionHandlingTest.getClassLoader(), new Class[] { Consumer }) { proxy, method, args ->
throw new IllegalStateException("can't invoke")
} as Consumer
KafkaTelemetry telemetry = KafkaTelemetry.builder(getOpenTelemetry())
.build()
def wrappedConsumer = telemetry.wrap(consumer)
when:
wrappedConsumer.poll(Duration.ofMillis(1))
then:
thrown IllegalStateException
}
def "test producer exception propagates to caller"() throws Exception {
setup:
def producer = Proxy.newProxyInstance(ExceptionHandlingTest.getClassLoader(), new Class[] { Producer }) { proxy, method, args ->
throw new IllegalStateException("can't invoke")
} as Producer
KafkaTelemetry telemetry = KafkaTelemetry.builder(getOpenTelemetry())
.build()
def wrappedProducer = telemetry.wrap(producer)
when:
wrappedProducer.flush()
then:
thrown IllegalStateException
}
}

View File

@ -1,103 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.kafkaclients
import io.opentelemetry.instrumentation.test.LibraryTestTrait
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import spock.lang.Unroll
import java.time.Duration
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
class InterceptorsTest extends KafkaClientBaseTest implements LibraryTestTrait {
@Override
Map<String, ?> producerProps() {
def props = super.producerProps()
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.getName())
return props
}
@Override
Map<String, ?> consumerProps() {
def props = super.consumerProps()
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.getName())
return props
}
@Unroll
def "test interceptors"() throws Exception {
when:
String greeting = "Hello Kafka!"
runWithSpan("parent") {
producer.send(new ProducerRecord(SHARED_TOPIC, greeting)) { meta, ex ->
if (ex == null) {
runWithSpan("producer callback") {}
} else {
runWithSpan("producer exception: " + ex) {}
}
}
}
then:
awaitUntilConsumerIsReady()
// check that the message was received
def records = consumer.poll(Duration.ofSeconds(5).toMillis())
records.count() == 1
for (record in records) {
assert record.value() == greeting
assert record.key() == null
}
assertTraces(2) {
traces.sort(orderByRootSpanKind(INTERNAL, PRODUCER, CONSUMER))
trace(0, 3) {
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
span(1) {
name SHARED_TOPIC + " send"
kind PRODUCER
childOf span(0)
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
}
}
span(2) {
name SHARED_TOPIC + " receive"
kind CONSUMER
childOf span(1)
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_OPERATION" "receive"
"$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 }
"messaging.kafka.message.offset" { it >= 0 }
}
}
}
trace(1, 1) {
span(0) {
name "producer callback"
kind INTERNAL
hasNoParent()
}
}
}
}
}

View File

@ -1,111 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.kafkaclients
import io.opentelemetry.instrumentation.test.LibraryTestTrait
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import java.nio.charset.StandardCharsets
import org.apache.kafka.clients.producer.ProducerRecord
import spock.lang.Unroll
import java.time.Duration
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
import static java.util.Collections.singletonList
class WrappersTest extends KafkaClientBaseTest implements LibraryTestTrait {
@Unroll
def "test wrappers, test headers: #testHeaders"() throws Exception {
KafkaTelemetry telemetry = KafkaTelemetry.builder(getOpenTelemetry())
.setCapturedHeaders(singletonList("test-message-header"))
// TODO run tests both with and without experimental span attributes
.setCaptureExperimentalSpanAttributes(true)
.build()
when:
String greeting = "Hello Kafka!"
def wrappedProducer = telemetry.wrap(producer)
runWithSpan("parent") {
def producerRecord = new ProducerRecord(SHARED_TOPIC, greeting)
if (testHeaders) {
producerRecord.headers().add("test-message-header", "test".getBytes(StandardCharsets.UTF_8))
}
wrappedProducer.send(producerRecord) { meta, ex ->
if (ex == null) {
runWithSpan("producer callback") {}
} else {
runWithSpan("producer exception: " + ex) {}
}
}
}
then:
awaitUntilConsumerIsReady()
// check that the message was received
def wrappedConsumer = telemetry.wrap(consumer)
def records = wrappedConsumer.poll(Duration.ofSeconds(5).toMillis())
records.count() == 1
for (record in records) {
assert record.value() == greeting
assert record.key() == null
}
assertTraces(1) {
traces.sort(orderByRootSpanKind(INTERNAL, PRODUCER, CONSUMER))
trace(0, 4) {
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
span(1) {
name SHARED_TOPIC + " send"
kind PRODUCER
childOf span(0)
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 }
"messaging.kafka.message.offset" { it >= 0 }
if (testHeaders) {
"messaging.header.test_message_header" { it == ["test"] }
}
}
}
span(2) {
name SHARED_TOPIC + " receive"
kind CONSUMER
childOf span(1)
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_OPERATION" "receive"
"$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 }
"messaging.kafka.message.offset" { it >= 0 }
"kafka.record.queue_time_ms" { it >= 0 }
if (testHeaders) {
"messaging.header.test_message_header" { it == ["test"] }
}
}
}
span(3) {
name "producer callback"
kind INTERNAL
childOf span(0)
}
}
}
where:
testHeaders << [false, true]
}
}

View File

@ -0,0 +1,61 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.kafka.internal;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import io.opentelemetry.instrumentation.kafkaclients.KafkaTelemetry;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
import java.lang.reflect.Proxy;
import java.time.Duration;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.Producer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
class ExceptionHandlingTest {
@RegisterExtension
static final InstrumentationExtension testing = LibraryInstrumentationExtension.create();
@Test
void testConsumerExceptionPropagatesToCaller() {
Consumer<?, ?> consumer =
(Consumer<?, ?>)
Proxy.newProxyInstance(
ExceptionHandlingTest.class.getClassLoader(),
new Class<?>[] {Consumer.class},
(proxy, method, args) -> {
throw new IllegalStateException("can't invoke");
});
KafkaTelemetry telemetry = KafkaTelemetry.builder(testing.getOpenTelemetry()).build();
Consumer<?, ?> wrappedConsumer = telemetry.wrap(consumer);
assertThatThrownBy(() -> wrappedConsumer.poll(Duration.ofMillis(1)))
.isInstanceOf(IllegalStateException.class)
.hasMessage("can't invoke");
}
@Test
void testProducerExceptionPropagatesToCaller() {
Producer<?, ?> producer =
(Producer<?, ?>)
Proxy.newProxyInstance(
ExceptionHandlingTest.class.getClassLoader(),
new Class<?>[] {Producer.class},
(proxy, method, args) -> {
throw new IllegalStateException("can't invoke");
});
KafkaTelemetry telemetry = KafkaTelemetry.builder(testing.getOpenTelemetry()).build();
Producer<?, ?> wrappedProducer = telemetry.wrap(producer);
assertThatThrownBy(wrappedProducer::flush)
.isInstanceOf(IllegalStateException.class)
.hasMessage("can't invoke");
}
}

View File

@ -0,0 +1,119 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.kafka.internal;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.kafkaclients.TracingConsumerInterceptor;
import io.opentelemetry.instrumentation.kafkaclients.TracingProducerInterceptor;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.HashMap;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.assertj.core.api.AbstractLongAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
class InterceptorsTest extends KafkaClientBaseTest {
@RegisterExtension
static final InstrumentationExtension testing = LibraryInstrumentationExtension.create();
@Override
public HashMap<String, Object> producerProps() {
HashMap<String, Object> props = super.producerProps();
props.put(
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
return props;
}
@Override
public HashMap<String, Object> consumerProps() {
HashMap<String, Object> props = super.consumerProps();
props.put(
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
return props;
}
@Test
void testInterceptors() throws InterruptedException {
String greeting = "Hello Kafka!";
testing.runWithSpan(
"parent",
() -> {
producer.send(
new ProducerRecord<>(SHARED_TOPIC, greeting),
(meta, ex) -> {
if (ex == null) {
testing.runWithSpan("producer callback", () -> {});
} else {
testing.runWithSpan("producer exception: " + ex, () -> {});
}
});
});
awaitUntilConsumerIsReady();
// check that the message was received
ConsumerRecords<?, ?> records = consumer.poll(Duration.ofSeconds(5));
assertThat(records.count()).isEqualTo(1);
for (ConsumerRecord<?, ?> record : records) {
assertThat(record.value()).isEqualTo(greeting);
assertThat(record.key()).isNull();
}
testing.waitAndAssertTraces(
trace -> {
trace.hasSpansSatisfyingExactly(
span -> {
span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent();
},
span -> {
span.hasName(SHARED_TOPIC + " send")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfying(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"));
},
span -> {
span.hasName(SHARED_TOPIC + " receive")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(1))
.hasAttributesSatisfying(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"),
equalTo(
SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES,
greeting.getBytes(StandardCharsets.UTF_8).length),
satisfies(
SemanticAttributes.MESSAGING_KAFKA_PARTITION,
AbstractLongAssert::isNotNegative),
satisfies(
AttributeKey.longKey("messaging.kafka.message.offset"),
AbstractLongAssert::isNotNegative));
});
},
trace -> {
trace.hasSpansSatisfyingExactly(
span -> {
span.hasName("producer callback").hasKind(SpanKind.INTERNAL).hasNoParent();
});
});
}
}

View File

@ -0,0 +1,136 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.kafka.internal;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.kafkaclients.KafkaTelemetry;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.assertj.core.api.AbstractLongAssert;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
class WrapperTest extends KafkaClientBaseTest {
@RegisterExtension
static final InstrumentationExtension testing = LibraryInstrumentationExtension.create();
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testWrappers(boolean testHeaders) throws InterruptedException {
KafkaTelemetry telemetry =
KafkaTelemetry.builder(testing.getOpenTelemetry())
.setCapturedHeaders(singletonList("test-message-header"))
// TODO run tests both with and without experimental span attributes
.setCaptureExperimentalSpanAttributes(true)
.build();
String greeting = "Hello Kafka!";
Producer<Integer, String> wrappedProducer = telemetry.wrap(producer);
testing.runWithSpan(
"parent",
() -> {
ProducerRecord<Integer, String> producerRecord =
new ProducerRecord<>(SHARED_TOPIC, greeting);
if (testHeaders) {
producerRecord
.headers()
.add("test-message-header", "test".getBytes(StandardCharsets.UTF_8));
}
wrappedProducer.send(
producerRecord,
(meta, ex) -> {
if (ex == null) {
testing.runWithSpan("producer callback", () -> {});
} else {
testing.runWithSpan("producer exception: " + ex, () -> {});
}
});
});
awaitUntilConsumerIsReady();
Consumer<Integer, String> wrappedConsumer = telemetry.wrap(consumer);
ConsumerRecords<?, ?> records = wrappedConsumer.poll(Duration.ofSeconds(10));
assertThat(records.count()).isEqualTo(1);
for (ConsumerRecord<?, ?> record : records) {
assertThat(record.value()).isEqualTo(greeting);
assertThat(record.key()).isNull();
}
testing.waitAndAssertTraces(
trace -> {
trace.hasSpansSatisfyingExactly(
span -> {
span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent();
},
span -> {
span.hasName(SHARED_TOPIC + " send")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfying(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
satisfies(
SemanticAttributes.MESSAGING_KAFKA_PARTITION,
AbstractLongAssert::isNotNegative));
if (testHeaders) {
span.hasAttributesSatisfying(
equalTo(
AttributeKey.stringArrayKey("messaging.header.test_message_header"),
Collections.singletonList("test")));
}
},
span -> {
span.hasName(SHARED_TOPIC + " receive")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(1))
.hasAttributesSatisfying(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION, SHARED_TOPIC),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"),
equalTo(
SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES,
greeting.getBytes(StandardCharsets.UTF_8).length),
satisfies(
SemanticAttributes.MESSAGING_KAFKA_PARTITION,
AbstractLongAssert::isNotNegative),
satisfies(
AttributeKey.longKey("messaging.kafka.message.offset"),
AbstractLongAssert::isNotNegative));
if (testHeaders) {
span.hasAttributesSatisfying(
equalTo(
AttributeKey.stringArrayKey("messaging.header.test_message_header"),
Collections.singletonList("test")));
}
},
span -> {
span.hasName("producer callback")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0));
});
});
}
}