Implement ExtendedTextMapGetter in kafka-clients instrumentation (#13068)

This commit is contained in:
xiepuhuan 2025-01-21 09:43:48 +08:00 committed by GitHub
parent 946babbf1f
commit ab09fcee98
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 63 additions and 13 deletions

View File

@ -108,7 +108,7 @@ class KafkaClientDefaultTest extends KafkaClientPropagationBaseTest {
.hasLinks(LinkData.create(producerSpan.get().getSpanContext())) .hasLinks(LinkData.create(producerSpan.get().getSpanContext()))
.hasParent(trace.getSpan(0)) .hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly( .hasAttributesSatisfyingExactly(
processAttributes("10", greeting, testHeaders)), processAttributes("10", greeting, testHeaders, false)),
span -> span.hasName("processing").hasParent(trace.getSpan(1)))); span -> span.hasName("processing").hasParent(trace.getSpan(1))));
} }
@ -152,7 +152,8 @@ class KafkaClientDefaultTest extends KafkaClientPropagationBaseTest {
.hasKind(SpanKind.CONSUMER) .hasKind(SpanKind.CONSUMER)
.hasLinks(LinkData.create(producerSpan.get().getSpanContext())) .hasLinks(LinkData.create(producerSpan.get().getSpanContext()))
.hasParent(trace.getSpan(0)) .hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(processAttributes(null, null, false)))); .hasAttributesSatisfyingExactly(
processAttributes(null, null, false, false))));
} }
@DisplayName("test records(TopicPartition) kafka consume") @DisplayName("test records(TopicPartition) kafka consume")
@ -203,6 +204,7 @@ class KafkaClientDefaultTest extends KafkaClientPropagationBaseTest {
.hasKind(SpanKind.CONSUMER) .hasKind(SpanKind.CONSUMER)
.hasLinks(LinkData.create(producerSpan.get().getSpanContext())) .hasLinks(LinkData.create(producerSpan.get().getSpanContext()))
.hasParent(trace.getSpan(0)) .hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(processAttributes(null, greeting, false)))); .hasAttributesSatisfyingExactly(
processAttributes(null, greeting, false, false))));
} }
} }

View File

@ -64,7 +64,8 @@ class KafkaClientPropagationDisabledTest extends KafkaClientPropagationBaseTest
span.hasName(SHARED_TOPIC + " process") span.hasName(SHARED_TOPIC + " process")
.hasKind(SpanKind.CONSUMER) .hasKind(SpanKind.CONSUMER)
.hasLinks(Collections.emptyList()) .hasLinks(Collections.emptyList())
.hasAttributesSatisfyingExactly(processAttributes(null, message, false)), .hasAttributesSatisfyingExactly(
processAttributes(null, message, false, false)),
span -> span.hasName("processing").hasParent(trace.getSpan(0)))); span -> span.hasName("processing").hasParent(trace.getSpan(0))));
} }
} }

View File

@ -12,6 +12,7 @@ import io.opentelemetry.instrumentation.kafka.internal.KafkaClientBaseTest;
import io.opentelemetry.instrumentation.kafka.internal.KafkaClientPropagationBaseTest; import io.opentelemetry.instrumentation.kafka.internal.KafkaClientPropagationBaseTest;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import java.nio.charset.StandardCharsets;
import java.time.Duration; import java.time.Duration;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
@ -33,8 +34,19 @@ class KafkaClientSuppressReceiveSpansTest extends KafkaClientPropagationBaseTest
testing.runWithSpan( testing.runWithSpan(
"parent", "parent",
() -> { () -> {
ProducerRecord<Integer, String> producerRecord =
new ProducerRecord<>(SHARED_TOPIC, 10, greeting);
producerRecord
.headers()
// adding baggage header in w3c baggage format
.add(
"baggage",
"test-baggage-key-1=test-baggage-value-1".getBytes(StandardCharsets.UTF_8))
.add(
"baggage",
"test-baggage-key-2=test-baggage-value-2".getBytes(StandardCharsets.UTF_8));
producer.send( producer.send(
new ProducerRecord<>(SHARED_TOPIC, 10, greeting), producerRecord,
(meta, ex) -> { (meta, ex) -> {
if (ex == null) { if (ex == null) {
testing.runWithSpan("producer callback", () -> {}); testing.runWithSpan("producer callback", () -> {});
@ -70,7 +82,8 @@ class KafkaClientSuppressReceiveSpansTest extends KafkaClientPropagationBaseTest
span.hasName(SHARED_TOPIC + " process") span.hasName(SHARED_TOPIC + " process")
.hasKind(SpanKind.CONSUMER) .hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(1)) .hasParent(trace.getSpan(1))
.hasAttributesSatisfyingExactly(processAttributes("10", greeting, false)), .hasAttributesSatisfyingExactly(
processAttributes("10", greeting, false, true)),
span -> span ->
span.hasName("processing") span.hasName("processing")
.hasKind(SpanKind.INTERNAL) .hasKind(SpanKind.INTERNAL)
@ -108,7 +121,8 @@ class KafkaClientSuppressReceiveSpansTest extends KafkaClientPropagationBaseTest
span.hasName(SHARED_TOPIC + " process") span.hasName(SHARED_TOPIC + " process")
.hasKind(SpanKind.CONSUMER) .hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(0)) .hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(processAttributes(null, null, false)))); .hasAttributesSatisfyingExactly(
processAttributes(null, null, false, false))));
} }
@Test @Test
@ -146,6 +160,7 @@ class KafkaClientSuppressReceiveSpansTest extends KafkaClientPropagationBaseTest
span.hasName(SHARED_TOPIC + " process") span.hasName(SHARED_TOPIC + " process")
.hasKind(SpanKind.CONSUMER) .hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(0)) .hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(processAttributes(null, greeting, false)))); .hasAttributesSatisfyingExactly(
processAttributes(null, greeting, false, false))));
} }
} }

View File

@ -216,7 +216,7 @@ public abstract class KafkaClientBaseTest {
@SuppressWarnings("deprecation") // using deprecated semconv @SuppressWarnings("deprecation") // using deprecated semconv
protected static List<AttributeAssertion> processAttributes( protected static List<AttributeAssertion> processAttributes(
String messageKey, String messageValue, boolean testHeaders) { String messageKey, String messageValue, boolean testHeaders, boolean testMultiBaggage) {
List<AttributeAssertion> assertions = List<AttributeAssertion> assertions =
new ArrayList<>( new ArrayList<>(
Arrays.asList( Arrays.asList(
@ -249,6 +249,11 @@ public abstract class KafkaClientBaseTest {
AttributeKey.stringArrayKey("messaging.header.test_message_header"), AttributeKey.stringArrayKey("messaging.header.test_message_header"),
Collections.singletonList("test"))); Collections.singletonList("test")));
} }
if (testMultiBaggage) {
assertions.add(equalTo(AttributeKey.stringKey("test-baggage-key-1"), "test-baggage-value-1"));
assertions.add(equalTo(AttributeKey.stringKey("test-baggage-key-2"), "test-baggage-value-2"));
}
return assertions; return assertions;
} }
} }

View File

@ -10,6 +10,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.instrumentation.kafka.internal.KafkaClientBaseTest; import io.opentelemetry.instrumentation.kafka.internal.KafkaClientBaseTest;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
import java.nio.charset.StandardCharsets;
import java.time.Duration; import java.time.Duration;
import java.util.Map; import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
@ -48,8 +49,19 @@ abstract class AbstractInterceptorsTest extends KafkaClientBaseTest {
testing.runWithSpan( testing.runWithSpan(
"parent", "parent",
() -> { () -> {
ProducerRecord<Integer, String> producerRecord =
new ProducerRecord<>(SHARED_TOPIC, greeting);
producerRecord
.headers()
// adding baggage header in w3c baggage format
.add(
"baggage",
"test-baggage-key-1=test-baggage-value-1".getBytes(StandardCharsets.UTF_8))
.add(
"baggage",
"test-baggage-key-2=test-baggage-value-2".getBytes(StandardCharsets.UTF_8));
producer.send( producer.send(
new ProducerRecord<>(SHARED_TOPIC, greeting), producerRecord,
(meta, ex) -> { (meta, ex) -> {
if (ex == null) { if (ex == null) {
testing.runWithSpan("producer callback", () -> {}); testing.runWithSpan("producer callback", () -> {});

View File

@ -15,6 +15,7 @@ import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.api.trace.SpanKind;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import org.assertj.core.api.AbstractLongAssert; import org.assertj.core.api.AbstractLongAssert;
@ -59,7 +60,13 @@ class InterceptorsSuppressReceiveSpansTest extends AbstractInterceptorsTest {
equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "test"), equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "test"),
satisfies( satisfies(
MESSAGING_CLIENT_ID, MESSAGING_CLIENT_ID,
stringAssert -> stringAssert.startsWith("consumer"))), stringAssert -> stringAssert.startsWith("consumer")),
equalTo(
AttributeKey.stringKey("test-baggage-key-1"),
"test-baggage-value-1"),
equalTo(
AttributeKey.stringKey("test-baggage-key-2"),
"test-baggage-value-2")),
span -> span ->
span.hasName("process child") span.hasName("process child")
.hasKind(SpanKind.INTERNAL) .hasKind(SpanKind.INTERNAL)

View File

@ -5,14 +5,15 @@
package io.opentelemetry.instrumentation.kafka.internal; package io.opentelemetry.instrumentation.kafka.internal;
import io.opentelemetry.context.propagation.TextMapGetter; import io.opentelemetry.context.propagation.internal.ExtendedTextMapGetter;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.StreamSupport; import java.util.stream.StreamSupport;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Header;
enum KafkaConsumerRecordGetter implements TextMapGetter<KafkaProcessRequest> { enum KafkaConsumerRecordGetter implements ExtendedTextMapGetter<KafkaProcessRequest> {
INSTANCE; INSTANCE;
@Override @Override
@ -35,4 +36,11 @@ enum KafkaConsumerRecordGetter implements TextMapGetter<KafkaProcessRequest> {
} }
return new String(value, StandardCharsets.UTF_8); return new String(value, StandardCharsets.UTF_8);
} }
@Override
public Iterator<String> getAll(@Nullable KafkaProcessRequest carrier, String key) {
return StreamSupport.stream(carrier.getRecord().headers().headers(key).spliterator(), false)
.map(header -> new String(header.value(), StandardCharsets.UTF_8))
.iterator();
}
} }