Capture messaging header value as span attribute (#6454)

* Capture messaging header value as span attribute

* add comment
This commit is contained in:
Lauri Tulmin 2022-08-12 03:28:04 +03:00 committed by GitHub
parent fea157fc70
commit 07d7cfd551
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
50 changed files with 871 additions and 66 deletions

View File

@ -0,0 +1,38 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.api.instrumenter.messaging;
import static java.util.Collections.unmodifiableList;
import io.opentelemetry.api.common.AttributeKey;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
final class CapturedMessageHeadersUtil {
private static final ConcurrentMap<String, AttributeKey<List<String>>> attributeKeysCache =
new ConcurrentHashMap<>();
static List<String> lowercase(List<String> names) {
return unmodifiableList(
names.stream().map(s -> s.toLowerCase(Locale.ROOT)).collect(Collectors.toList()));
}
static AttributeKey<List<String>> attributeKey(String headerName) {
return attributeKeysCache.computeIfAbsent(headerName, n -> createKey(n));
}
private static AttributeKey<List<String>> createKey(String headerName) {
// headerName is always lowercase, see MessagingAttributesExtractor
String key = "messaging.header." + headerName.replace('-', '_');
return AttributeKey.stringArrayKey(key);
}
private CapturedMessageHeadersUtil() {}
}

View File

@ -5,6 +5,8 @@
package io.opentelemetry.instrumentation.api.instrumenter.messaging;
import static io.opentelemetry.instrumentation.api.instrumenter.messaging.CapturedMessageHeadersUtil.attributeKey;
import static io.opentelemetry.instrumentation.api.instrumenter.messaging.CapturedMessageHeadersUtil.lowercase;
import static io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation.PROCESS;
import static io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation.RECEIVE;
import static io.opentelemetry.instrumentation.api.internal.AttributesExtractorUtil.internalSet;
@ -15,6 +17,7 @@ import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.internal.SpanKey;
import io.opentelemetry.instrumentation.api.internal.SpanKeyProvider;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.util.List;
import javax.annotation.Nullable;
/**
@ -31,20 +34,34 @@ public final class MessagingAttributesExtractor<REQUEST, RESPONSE>
static final String TEMP_DESTINATION_NAME = "(temporary)";
/**
* Creates the messaging attributes extractor for the given {@link MessageOperation operation}.
* Creates the messaging attributes extractor for the given {@link MessageOperation operation}
* with default configuration.
*/
public static <REQUEST, RESPONSE> MessagingAttributesExtractor<REQUEST, RESPONSE> create(
MessagingAttributesGetter<REQUEST, RESPONSE> getter, MessageOperation operation) {
return new MessagingAttributesExtractor<>(getter, operation);
return builder(getter, operation).build();
}
/**
* Returns a new {@link MessagingAttributesExtractorBuilder} for the given {@link MessageOperation
* operation} that can be used to configure the messaging attributes extractor.
*/
public static <REQUEST, RESPONSE> MessagingAttributesExtractorBuilder<REQUEST, RESPONSE> builder(
MessagingAttributesGetter<REQUEST, RESPONSE> getter, MessageOperation operation) {
return new MessagingAttributesExtractorBuilder<>(getter, operation);
}
private final MessagingAttributesGetter<REQUEST, RESPONSE> getter;
private final MessageOperation operation;
private final List<String> capturedHeaders;
private MessagingAttributesExtractor(
MessagingAttributesGetter<REQUEST, RESPONSE> getter, MessageOperation operation) {
MessagingAttributesExtractor(
MessagingAttributesGetter<REQUEST, RESPONSE> getter,
MessageOperation operation,
List<String> capturedHeaders) {
this.getter = getter;
this.operation = operation;
this.capturedHeaders = lowercase(capturedHeaders);
}
@SuppressWarnings("deprecation") // operationName
@ -89,6 +106,13 @@ public final class MessagingAttributesExtractor<REQUEST, RESPONSE>
@Nullable Throwable error) {
internalSet(
attributes, SemanticAttributes.MESSAGING_MESSAGE_ID, getter.messageId(request, response));
for (String name : capturedHeaders) {
List<String> values = getter.header(request, name);
if (!values.isEmpty()) {
internalSet(attributes, attributeKey(name), values);
}
}
}
/**

View File

@ -0,0 +1,47 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.api.instrumenter.messaging;
import static java.util.Collections.emptyList;
import java.util.List;
/** A builder of {@link MessagingAttributesExtractor}. */
public final class MessagingAttributesExtractorBuilder<REQUEST, RESPONSE> {
final MessagingAttributesGetter<REQUEST, RESPONSE> getter;
final MessageOperation operation;
List<String> capturedHeaders = emptyList();
MessagingAttributesExtractorBuilder(
MessagingAttributesGetter<REQUEST, RESPONSE> getter, MessageOperation operation) {
this.getter = getter;
this.operation = operation;
}
/**
* Configures the messaging headers that will be captured as span attributes.
*
* <p>The messaging header values will be captured under the {@code messaging.header.<name>}
* attribute key. The {@code <name>} part in the attribute key is the normalized header name:
* lowercase, with dashes replaced by underscores.
*
* @param capturedHeaders A list of messaging header names.
*/
public MessagingAttributesExtractorBuilder<REQUEST, RESPONSE> setCapturedHeaders(
List<String> capturedHeaders) {
this.capturedHeaders = capturedHeaders;
return this;
}
/**
* Returns a new {@link MessagingAttributesExtractor} with the settings of this {@link
* MessagingAttributesExtractorBuilder}.
*/
public MessagingAttributesExtractor<REQUEST, RESPONSE> build() {
return new MessagingAttributesExtractor<>(getter, operation, capturedHeaders);
}
}

View File

@ -5,6 +5,8 @@
package io.opentelemetry.instrumentation.api.instrumenter.messaging;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
/**
@ -47,4 +49,15 @@ public interface MessagingAttributesGetter<REQUEST, RESPONSE> {
@Nullable
String messageId(REQUEST request, @Nullable RESPONSE response);
/**
* Extracts all values of header named {@code name} from the request, or an empty list if there
* were none.
*
* <p>Implementations of this method <b>must not</b> return a null value; an empty list should be
* returned instead.
*/
default List<String> header(REQUEST request, String name) {
return Collections.emptyList();
}
}

View File

@ -8,6 +8,8 @@ package io.opentelemetry.javaagent.instrumentation.jms;
import static java.util.logging.Level.FINE;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import java.util.Collections;
import java.util.List;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.jms.JMSException;
@ -86,9 +88,22 @@ public enum JmsMessageAttributesGetter
public String messageId(MessageWithDestination messageWithDestination, Void unused) {
try {
return messageWithDestination.message().getJMSMessageID();
} catch (JMSException e) {
logger.log(FINE, "Failure getting JMS message id", e);
} catch (JMSException exception) {
logger.log(FINE, "Failure getting JMS message id", exception);
return null;
}
}
@Override
public List<String> header(MessageWithDestination messageWithDestination, String name) {
try {
String value = messageWithDestination.message().getStringProperty(name);
if (value != null) {
return Collections.singletonList(value);
}
} catch (JMSException exception) {
logger.log(FINE, "Failure getting JMS message header", exception);
}
return Collections.emptyList();
}
}

View File

@ -10,6 +10,7 @@ import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
@ -31,7 +32,7 @@ public final class JmsSingletons {
GlobalOpenTelemetry.get(),
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractor(MessagingAttributesExtractor.create(getter, operation))
.addAttributesExtractor(buildMessagingAttributesExtractor(getter, operation))
.buildProducerInstrumenter(MessagePropertySetter.INSTANCE);
}
@ -44,7 +45,7 @@ public final class JmsSingletons {
GlobalOpenTelemetry.get(),
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractor(MessagingAttributesExtractor.create(getter, operation))
.addAttributesExtractor(buildMessagingAttributesExtractor(getter, operation))
.setEnabled(ExperimentalConfig.get().messagingReceiveInstrumentationEnabled())
.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
}
@ -57,10 +58,19 @@ public final class JmsSingletons {
GlobalOpenTelemetry.get(),
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractor(MessagingAttributesExtractor.create(getter, operation))
.addAttributesExtractor(buildMessagingAttributesExtractor(getter, operation))
.buildConsumerInstrumenter(MessagePropertyGetter.INSTANCE);
}
private static MessagingAttributesExtractor<MessageWithDestination, Void>
buildMessagingAttributesExtractor(
MessagingAttributesGetter<MessageWithDestination, Void> getter,
MessageOperation operation) {
return MessagingAttributesExtractor.builder(getter, operation)
.setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders())
.build();
}
public static Instrumenter<MessageWithDestination, Void> producerInstrumenter() {
return PRODUCER_INSTRUMENTER;
}

View File

@ -267,7 +267,39 @@ class Jms1Test extends AgentInstrumentationSpecification {
session.createTemporaryTopic() | "topic" | "(temporary)"
}
static producerSpan(TraceAssert trace, int index, String destinationType, String destinationName) {
def "capture message header as span attribute"() {
setup:
def destinationName = "someQueue"
def destinationType = "queue"
def destination = session.createQueue(destinationName)
def producer = session.createProducer(destination)
def consumer = session.createConsumer(destination)
def message = session.createTextMessage(messageText)
message.setStringProperty("test-message-header", "test")
message.setIntProperty("test-message-int-header", 1234)
producer.send(message)
TextMessage receivedMessage = consumer.receive()
String messageId = receivedMessage.getJMSMessageID()
expect:
receivedMessage.text == messageText
assertTraces(2) {
trace(0, 1) {
producerSpan(it, 0, destinationType, destinationName, true)
}
trace(1, 1) {
consumerSpan(it, 0, destinationType, destinationName, messageId, null, "receive", true)
}
}
cleanup:
producer.close()
consumer.close()
}
static producerSpan(TraceAssert trace, int index, String destinationType, String destinationName, boolean testHeaders = false) {
trace.span(index) {
name destinationName + " send"
kind PRODUCER
@ -280,6 +312,10 @@ class Jms1Test extends AgentInstrumentationSpecification {
"$SemanticAttributes.MESSAGING_TEMP_DESTINATION" true
}
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
if (testHeaders) {
"messaging.header.test_message_header" { it == ["test"] }
"messaging.header.test_message_int_header" { it == ["1234"] }
}
}
}
}
@ -287,7 +323,7 @@ class Jms1Test extends AgentInstrumentationSpecification {
// passing messageId = null will verify message.id is not captured,
// passing messageId = "" will verify message.id is captured (but won't verify anything about the value),
// any other value for messageId will verify that message.id is captured and has that same value
static consumerSpan(TraceAssert trace, int index, String destinationType, String destinationName, String messageId, Object parentOrLinkedSpan, String operation) {
static consumerSpan(TraceAssert trace, int index, String destinationType, String destinationName, String messageId, Object parentOrLinkedSpan, String operation, boolean testHeaders = false) {
trace.span(index) {
name destinationName + " " + operation
kind CONSUMER
@ -308,6 +344,10 @@ class Jms1Test extends AgentInstrumentationSpecification {
if (destinationName == "(temporary)") {
"$SemanticAttributes.MESSAGING_TEMP_DESTINATION" true
}
if (testHeaders) {
"messaging.header.test_message_header" { it == ["test"] }
"messaging.header.test_message_int_header" { it == ["1234"] }
}
}
}
}

View File

@ -65,27 +65,28 @@ public class KafkaConsumerInstrumentation implements TypeInstrumentation {
Context parentContext = currentContext();
if (consumerReceiveInstrumenter().shouldStart(parentContext, records)) {
Context context =
InstrumenterUtil.startAndEnd(
consumerReceiveInstrumenter(),
parentContext,
records,
null,
error,
timer.startTime(),
timer.now());
// we're storing the context of the receive span so that process spans can use it as parent
// context even though the span has ended
// this is the suggested behavior according to the spec batch receive scenario:
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#batch-receiving
VirtualField<ConsumerRecords<?, ?>, Context> consumerRecordsContext =
VirtualField.find(ConsumerRecords.class, Context.class);
consumerRecordsContext.set(records, context);
// disable process tracing and store the receive span for each individual record too
boolean previousValue = KafkaClientsConsumerProcessTracing.setEnabled(false);
try {
Context context =
InstrumenterUtil.startAndEnd(
consumerReceiveInstrumenter(),
parentContext,
records,
null,
error,
timer.startTime(),
timer.now());
// we're storing the context of the receive span so that process spans can use it as
// parent
// context even though the span has ended
// this is the suggested behavior according to the spec batch receive scenario:
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#batch-receiving
VirtualField<ConsumerRecords<?, ?>, Context> consumerRecordsContext =
VirtualField.find(ConsumerRecords.class, Context.class);
consumerRecordsContext.set(records, context);
VirtualField<ConsumerRecord<?, ?>, Context> consumerRecordContext =
VirtualField.find(ConsumerRecord.class, Context.class);
for (ConsumerRecord<?, ?> record : records) {

View File

@ -28,6 +28,7 @@ public final class KafkaSingletons {
static {
KafkaInstrumenterFactory instrumenterFactory =
new KafkaInstrumenterFactory(GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME)
.setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders())
.setCaptureExperimentalSpanAttributes(
InstrumentationConfig.get()
.getBoolean("otel.instrumentation.kafka.experimental-span-attributes", false))

View File

@ -7,6 +7,7 @@ package io.opentelemetry.instrumentation.kafkaclients
import io.opentelemetry.sdk.trace.data.SpanData
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import java.nio.charset.StandardCharsets
import org.apache.kafka.clients.producer.ProducerRecord
import java.time.Duration
@ -18,11 +19,15 @@ import static io.opentelemetry.api.trace.SpanKind.PRODUCER
class KafkaClientDefaultTest extends KafkaClientPropagationBaseTest {
def "test kafka produce and consume"() {
def "test kafka produce and consume, test headers: #testHeaders"() {
when:
String greeting = "Hello Kafka!"
runWithSpan("parent") {
producer.send(new ProducerRecord(SHARED_TOPIC, greeting)) { meta, ex ->
def producerRecord = new ProducerRecord(SHARED_TOPIC, greeting)
if (testHeaders) {
producerRecord.headers().add("test-message-header", "test".getBytes(StandardCharsets.UTF_8))
}
producer.send(producerRecord) { meta, ex ->
if (ex == null) {
runWithSpan("producer callback") {}
} else {
@ -63,6 +68,9 @@ class KafkaClientDefaultTest extends KafkaClientPropagationBaseTest {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
if (testHeaders) {
"messaging.header.test_message_header" { it == ["test"] }
}
}
}
span(2) {
@ -83,6 +91,9 @@ class KafkaClientDefaultTest extends KafkaClientPropagationBaseTest {
"$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_OPERATION" "receive"
if (testHeaders) {
"messaging.header.test_message_header" { it == ["test"] }
}
}
}
span(1) {
@ -99,6 +110,9 @@ class KafkaClientDefaultTest extends KafkaClientPropagationBaseTest {
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 }
"kafka.offset" Long
"kafka.record.queue_time_ms" { it >= 0 }
if (testHeaders) {
"messaging.header.test_message_header" { it == ["test"] }
}
}
}
span(2) {
@ -107,6 +121,9 @@ class KafkaClientDefaultTest extends KafkaClientPropagationBaseTest {
}
}
}
where:
testHeaders << [false, true]
}
def "test pass through tombstone"() {

View File

@ -5,6 +5,8 @@
package io.opentelemetry.instrumentation.kafkaclients;
import static java.util.Collections.emptyList;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
@ -23,6 +25,7 @@ public final class KafkaTelemetryBuilder {
new ArrayList<>();
private final List<AttributesExtractor<ConsumerRecord<?, ?>, Void>> consumerAttributesExtractors =
new ArrayList<>();
private List<String> capturedHeaders = emptyList();
private boolean captureExperimentalSpanAttributes = false;
private boolean propagationEnabled = true;
@ -42,6 +45,16 @@ public final class KafkaTelemetryBuilder {
return this;
}
/**
* Configures the messaging headers that will be captured as span attributes.
*
* @param capturedHeaders A list of messaging header names.
*/
public KafkaTelemetryBuilder setCapturedHeaders(List<String> capturedHeaders) {
this.capturedHeaders = capturedHeaders;
return this;
}
/**
* Sets whether experimental attributes should be set to spans. These attributes may be changed or
* removed in the future, so only enable this if you know you do not require attributes filled by
@ -65,6 +78,7 @@ public final class KafkaTelemetryBuilder {
public KafkaTelemetry build() {
KafkaInstrumenterFactory instrumenterFactory =
new KafkaInstrumenterFactory(openTelemetry, INSTRUMENTATION_NAME)
.setCapturedHeaders(capturedHeaders)
.setCaptureExperimentalSpanAttributes(captureExperimentalSpanAttributes)
.setPropagationEnabled(propagationEnabled);

View File

@ -7,6 +7,7 @@ package io.opentelemetry.instrumentation.kafkaclients
import io.opentelemetry.instrumentation.test.LibraryTestTrait
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import java.nio.charset.StandardCharsets
import org.apache.kafka.clients.producer.ProducerRecord
import spock.lang.Unroll
@ -15,21 +16,27 @@ import java.time.Duration
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
import static java.util.Collections.singletonList
class WrappersTest extends KafkaClientBaseTest implements LibraryTestTrait {
@Unroll
def "test wrappers"() throws Exception {
def "test wrappers, test headers: #testHeaders"() throws Exception {
KafkaTelemetry telemetry = KafkaTelemetry.builder(getOpenTelemetry())
// TODO run tests both with and without experimental span attributes
.setCaptureExperimentalSpanAttributes(true)
.build()
.setCapturedHeaders(singletonList("test-message-header"))
// TODO run tests both with and without experimental span attributes
.setCaptureExperimentalSpanAttributes(true)
.build()
when:
String greeting = "Hello Kafka!"
def wrappedProducer = telemetry.wrap(producer)
runWithSpan("parent") {
wrappedProducer.send(new ProducerRecord(SHARED_TOPIC, greeting)) { meta, ex ->
def producerRecord = new ProducerRecord(SHARED_TOPIC, greeting)
if (testHeaders) {
producerRecord.headers().add("test-message-header", "test".getBytes(StandardCharsets.UTF_8))
}
wrappedProducer.send(producerRecord) { meta, ex ->
if (ex == null) {
runWithSpan("producer callback") {}
} else {
@ -65,6 +72,9 @@ class WrappersTest extends KafkaClientBaseTest implements LibraryTestTrait {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
if (testHeaders) {
"messaging.header.test_message_header" { it == ["test"] }
}
}
}
span(2) {
@ -80,6 +90,9 @@ class WrappersTest extends KafkaClientBaseTest implements LibraryTestTrait {
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 }
"kafka.offset" Long
"kafka.record.queue_time_ms" { it >= 0 }
if (testHeaders) {
"messaging.header.test_message_header" { it == ["test"] }
}
}
}
span(3) {
@ -89,6 +102,9 @@ class WrappersTest extends KafkaClientBaseTest implements LibraryTestTrait {
}
}
}
where:
testHeaders << [false, true]
}
}

View File

@ -7,8 +7,11 @@ package io.opentelemetry.instrumentation.kafka.internal;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
@ -82,4 +85,14 @@ enum KafkaBatchProcessAttributesGetter
public String messageId(ConsumerRecords<?, ?> records, @Nullable Void unused) {
return null;
}
@Override
public List<String> header(ConsumerRecords<?, ?> records, String name) {
return StreamSupport.stream(records.spliterator(), false)
.flatMap(
consumerRecord ->
StreamSupport.stream(consumerRecord.headers().headers(name).spliterator(), false))
.map(header -> new String(header.value(), StandardCharsets.UTF_8))
.collect(Collectors.toList());
}
}

View File

@ -7,6 +7,10 @@ package io.opentelemetry.instrumentation.kafka.internal;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@ -78,4 +82,11 @@ public enum KafkaConsumerAttributesGetter
public String messageId(ConsumerRecord<?, ?> consumerRecord, @Nullable Void unused) {
return null;
}
@Override
public List<String> header(ConsumerRecord<?, ?> consumerRecord, String name) {
return StreamSupport.stream(consumerRecord.headers().headers(name).spliterator(), false)
.map(header -> new String(header.value(), StandardCharsets.UTF_8))
.collect(Collectors.toList());
}
}

View File

@ -5,6 +5,8 @@
package io.opentelemetry.instrumentation.kafka.internal;
import static java.util.Collections.emptyList;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.ErrorCauseExtractor;
@ -14,8 +16,10 @@ import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import java.util.Collections;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerRecord;
@ -29,6 +33,7 @@ public final class KafkaInstrumenterFactory {
private final OpenTelemetry openTelemetry;
private final String instrumentationName;
private ErrorCauseExtractor errorCauseExtractor = ErrorCauseExtractor.jdk();
private List<String> capturedHeaders = emptyList();
private boolean captureExperimentalSpanAttributes = false;
private boolean propagationEnabled = true;
private boolean messagingReceiveInstrumentationEnabled = false;
@ -43,6 +48,11 @@ public final class KafkaInstrumenterFactory {
return this;
}
public KafkaInstrumenterFactory setCapturedHeaders(List<String> capturedHeaders) {
this.capturedHeaders = capturedHeaders;
return this;
}
public KafkaInstrumenterFactory setCaptureExperimentalSpanAttributes(
boolean captureExperimentalSpanAttributes) {
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
@ -74,7 +84,8 @@ public final class KafkaInstrumenterFactory {
openTelemetry,
instrumentationName,
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractor(MessagingAttributesExtractor.create(getter, operation))
.addAttributesExtractor(
buildMessagingAttributesExtractor(getter, operation, capturedHeaders))
.addAttributesExtractors(extractors)
.addAttributesExtractor(new KafkaProducerAdditionalAttributesExtractor())
.setErrorCauseExtractor(errorCauseExtractor)
@ -89,7 +100,8 @@ public final class KafkaInstrumenterFactory {
openTelemetry,
instrumentationName,
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractor(MessagingAttributesExtractor.create(getter, operation))
.addAttributesExtractor(
buildMessagingAttributesExtractor(getter, operation, capturedHeaders))
.setErrorCauseExtractor(errorCauseExtractor)
.setEnabled(messagingReceiveInstrumentationEnabled)
.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
@ -110,7 +122,8 @@ public final class KafkaInstrumenterFactory {
openTelemetry,
instrumentationName,
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractor(MessagingAttributesExtractor.create(getter, operation))
.addAttributesExtractor(
buildMessagingAttributesExtractor(getter, operation, capturedHeaders))
.addAttributesExtractor(new KafkaConsumerAdditionalAttributesExtractor())
.addAttributesExtractors(extractors)
.setErrorCauseExtractor(errorCauseExtractor);
@ -139,11 +152,21 @@ public final class KafkaInstrumenterFactory {
openTelemetry,
instrumentationName,
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractor(MessagingAttributesExtractor.create(getter, operation))
.addAttributesExtractor(
buildMessagingAttributesExtractor(getter, operation, capturedHeaders))
.addSpanLinksExtractor(
new KafkaBatchProcessSpanLinksExtractor(
openTelemetry.getPropagators().getTextMapPropagator()))
.setErrorCauseExtractor(errorCauseExtractor)
.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
}
private static <T> MessagingAttributesExtractor<T, Void> buildMessagingAttributesExtractor(
MessagingAttributesGetter<T, Void> getter,
MessageOperation operation,
List<String> capturedHeaders) {
return MessagingAttributesExtractor.builder(getter, operation)
.setCapturedHeaders(capturedHeaders)
.build();
}
}

View File

@ -7,6 +7,10 @@ package io.opentelemetry.instrumentation.kafka.internal;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.apache.kafka.clients.producer.ProducerRecord;
@ -79,4 +83,11 @@ public enum KafkaProducerAttributesGetter
public String messageId(ProducerRecord<?, ?> producerRecord, @Nullable Void unused) {
return null;
}
@Override
public List<String> header(ProducerRecord<?, ?> producerRecord, String name) {
return StreamSupport.stream(producerRecord.headers().headers(name).spliterator(), false)
.map(header -> new String(header.value(), StandardCharsets.UTF_8))
.collect(Collectors.toList());
}
}

View File

@ -7,8 +7,11 @@ package io.opentelemetry.instrumentation.kafka.internal;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
@ -88,4 +91,14 @@ public enum KafkaReceiveAttributesGetter
public String messageId(ConsumerRecords<?, ?> consumerRecords, @Nullable Void unused) {
return null;
}
@Override
public List<String> header(ConsumerRecords<?, ?> records, String name) {
return StreamSupport.stream(records.spliterator(), false)
.flatMap(
consumerRecord ->
StreamSupport.stream(consumerRecord.headers().headers(name).spliterator(), false))
.map(header -> new String(header.value(), StandardCharsets.UTF_8))
.collect(Collectors.toList());
}
}

View File

@ -18,6 +18,7 @@ public final class KafkaStreamsSingletons {
private static final Instrumenter<ConsumerRecord<?, ?>, Void> INSTRUMENTER =
new KafkaInstrumenterFactory(GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME)
.setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders())
.setCaptureExperimentalSpanAttributes(
InstrumentationConfig.get()
.getBoolean("otel.instrumentation.kafka.experimental-span-attributes", false))

View File

@ -7,6 +7,7 @@ package io.opentelemetry.javaagent.instrumentation.rabbitmq;
import com.google.auto.value.AutoValue;
import com.rabbitmq.client.Channel;
import java.util.Map;
@AutoValue
public abstract class ChannelAndMethod {
@ -18,4 +19,14 @@ public abstract class ChannelAndMethod {
abstract Channel getChannel();
abstract String getMethod();
private Map<String, Object> headers;
public Map<String, Object> getHeaders() {
return headers;
}
public void setHeaders(Map<String, Object> headers) {
this.headers = headers;
}
}

View File

@ -0,0 +1,18 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.rabbitmq;
public class RabbitChannelAndMethodHolder {
private ChannelAndMethod channelAndMethod;
public ChannelAndMethod getChannelAndMethod() {
return channelAndMethod;
}
public void setChannelAndMethod(ChannelAndMethod channelAndMethod) {
this.channelAndMethod = channelAndMethod;
}
}

View File

@ -7,6 +7,8 @@ package io.opentelemetry.javaagent.instrumentation.rabbitmq;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
enum RabbitChannelAttributesGetter implements MessagingAttributesGetter<ChannelAndMethod, Void> {
@ -74,4 +76,15 @@ enum RabbitChannelAttributesGetter implements MessagingAttributesGetter<ChannelA
public String messageId(ChannelAndMethod channelAndMethod, @Nullable Void unused) {
return null;
}
@Override
public List<String> header(ChannelAndMethod channelAndMethod, String name) {
if (channelAndMethod.getHeaders() != null) {
Object value = channelAndMethod.getHeaders().get(name);
if (value != null) {
return Collections.singletonList(value.toString());
}
}
return Collections.emptyList();
}
}

View File

@ -113,6 +113,7 @@ public class RabbitChannelInstrumentation implements TypeInstrumentation {
context = channelInstrumenter().start(parentContext, request);
CURRENT_RABBIT_CONTEXT.set(context);
helper().setChannelAndMethod(context, request);
scope = context.makeCurrent();
}
@ -157,7 +158,7 @@ public class RabbitChannelInstrumentation implements TypeInstrumentation {
if (props == null) {
props = MessageProperties.MINIMAL_BASIC;
}
helper().onProps(span, props);
helper().onProps(context, span, props);
// We need to copy the BasicProperties and provide a header map we can modify
Map<String, Object> headers = props.getHeaders();

View File

@ -7,6 +7,8 @@ package io.opentelemetry.javaagent.instrumentation.rabbitmq;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
enum RabbitDeliveryAttributesGetter implements MessagingAttributesGetter<DeliveryRequest, Void> {
@ -85,4 +87,13 @@ enum RabbitDeliveryAttributesGetter implements MessagingAttributesGetter<Deliver
public String messageId(DeliveryRequest request, @Nullable Void unused) {
return null;
}
@Override
public List<String> header(DeliveryRequest request, String name) {
Object value = request.getProperties().getHeaders().get(name);
if (value != null) {
return Collections.singletonList(value.toString());
}
return Collections.emptyList();
}
}

View File

@ -5,6 +5,8 @@
package io.opentelemetry.javaagent.instrumentation.rabbitmq;
import static io.opentelemetry.javaagent.instrumentation.rabbitmq.RabbitSingletons.CHANNEL_AND_METHOD_CONTEXT_KEY;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Command;
import io.opentelemetry.api.GlobalOpenTelemetry;
@ -41,13 +43,18 @@ public class RabbitInstrumenterHelper {
}
}
public void onProps(Span span, AMQP.BasicProperties props) {
public void onProps(Context context, Span span, AMQP.BasicProperties props) {
if (CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) {
Integer deliveryMode = props.getDeliveryMode();
if (deliveryMode != null) {
span.setAttribute("rabbitmq.delivery_mode", deliveryMode);
}
}
RabbitChannelAndMethodHolder channelContext = context.get(CHANNEL_AND_METHOD_CONTEXT_KEY);
ChannelAndMethod channelAndMethod = channelContext.getChannelAndMethod();
if (channelAndMethod != null) {
channelAndMethod.setHeaders(props.getHeaders());
}
}
private static String normalizeExchangeName(String exchange) {
@ -68,4 +75,11 @@ public class RabbitInstrumenterHelper {
public void inject(Context context, Map<String, Object> headers, MapSetter setter) {
GlobalOpenTelemetry.getPropagators().getTextMapPropagator().inject(context, headers, setter);
}
public void setChannelAndMethod(Context context, ChannelAndMethod channelAndMethod) {
RabbitChannelAndMethodHolder holder = context.get(CHANNEL_AND_METHOD_CONTEXT_KEY);
if (holder != null) {
holder.setChannelAndMethod(channelAndMethod);
}
}
}

View File

@ -8,6 +8,8 @@ package io.opentelemetry.javaagent.instrumentation.rabbitmq;
import com.rabbitmq.client.GetResponse;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
enum RabbitReceiveAttributesGetter
@ -84,4 +86,16 @@ enum RabbitReceiveAttributesGetter
public String messageId(ReceiveRequest request, @Nullable GetResponse response) {
return null;
}
@Override
public List<String> header(ReceiveRequest request, String name) {
GetResponse response = request.getResponse();
if (response != null) {
Object value = request.getResponse().getProps().getHeaders().get(name);
if (value != null) {
return Collections.singletonList(value.toString());
}
}
return Collections.emptyList();
}
}

View File

@ -10,12 +10,15 @@ import static io.opentelemetry.api.trace.SpanKind.PRODUCER;
import com.rabbitmq.client.GetResponse;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.context.ContextKey;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.instrumentation.api.instrumenter.net.NetClientAttributesExtractor;
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig;
import java.util.ArrayList;
import java.util.List;
@ -29,6 +32,8 @@ public final class RabbitSingletons {
private static final Instrumenter<ChannelAndMethod, Void> channelInstrumenter;
private static final Instrumenter<ReceiveRequest, GetResponse> receiveInstrumenter;
private static final Instrumenter<DeliveryRequest, Void> deliverInstrumenter;
static final ContextKey<RabbitChannelAndMethodHolder> CHANNEL_AND_METHOD_CONTEXT_KEY =
ContextKey.named("opentelemetry-rabbitmq-channel-and-method-context-key");
static {
channelInstrumenter = createChannelInstrumenter();
@ -52,10 +57,13 @@ public final class RabbitSingletons {
return Instrumenter.<ChannelAndMethod, Void>builder(
GlobalOpenTelemetry.get(), instrumentationName, ChannelAndMethod::getMethod)
.addAttributesExtractor(
MessagingAttributesExtractor.create(
buildMessagingAttributesExtractor(
RabbitChannelAttributesGetter.INSTANCE, MessageOperation.SEND))
.addAttributesExtractor(
NetClientAttributesExtractor.create(new RabbitChannelNetAttributesGetter()))
.addContextCustomizer(
(context, request, startAttributes) ->
context.with(CHANNEL_AND_METHOD_CONTEXT_KEY, new RabbitChannelAndMethodHolder()))
.buildInstrumenter(
channelAndMethod ->
channelAndMethod.getMethod().equals("Channel.basicPublish") ? PRODUCER : CLIENT);
@ -64,7 +72,7 @@ public final class RabbitSingletons {
private static Instrumenter<ReceiveRequest, GetResponse> createReceiveInstrumenter() {
List<AttributesExtractor<ReceiveRequest, GetResponse>> extractors = new ArrayList<>();
extractors.add(
MessagingAttributesExtractor.create(
buildMessagingAttributesExtractor(
RabbitReceiveAttributesGetter.INSTANCE, MessageOperation.RECEIVE));
extractors.add(NetClientAttributesExtractor.create(new RabbitReceiveNetAttributesGetter()));
if (CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) {
@ -80,7 +88,7 @@ public final class RabbitSingletons {
private static Instrumenter<DeliveryRequest, Void> createDeliverInstrumenter() {
List<AttributesExtractor<DeliveryRequest, Void>> extractors = new ArrayList<>();
extractors.add(
MessagingAttributesExtractor.create(
buildMessagingAttributesExtractor(
RabbitDeliveryAttributesGetter.INSTANCE, MessageOperation.PROCESS));
extractors.add(new RabbitDeliveryExtraAttributesExtractor());
if (CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) {
@ -93,5 +101,12 @@ public final class RabbitSingletons {
.buildConsumerInstrumenter(DeliveryRequestGetter.INSTANCE);
}
private static <T, V> MessagingAttributesExtractor<T, V> buildMessagingAttributesExtractor(
MessagingAttributesGetter<T, V> getter, MessageOperation operation) {
return MessagingAttributesExtractor.builder(getter, operation)
.setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders())
.build();
}
private RabbitSingletons() {}
}

View File

@ -16,6 +16,8 @@ import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.instrumentation.test.asserts.TraceAssert
import io.opentelemetry.sdk.trace.data.SpanData
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import org.springframework.amqp.core.AmqpAdmin
import org.springframework.amqp.core.AmqpTemplate
import org.springframework.amqp.core.Queue
@ -273,6 +275,44 @@ class RabbitMqTest extends AgentInstrumentationSpecification implements WithRabb
}
}
def "capture message header as span attributes"() {
setup:
String queueName = channel.queueDeclare().getQueue()
def properties = new AMQP.BasicProperties.Builder().headers(["test-message-header": "test"]).build()
channel.basicPublish("", queueName, properties, "Hello, world!".getBytes())
def latch = new CountDownLatch(1)
def deliveries = []
Consumer callback = new DefaultConsumer(channel) {
@Override
void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties props, byte[] body) throws IOException {
deliveries << new String(body)
latch.countDown()
}
}
channel.basicConsume(queueName, callback)
latch.await(10, TimeUnit.SECONDS)
expect:
deliveries[0] == "Hello, world!"
and:
assertTraces(3) {
traces.subList(1, 3).sort(orderByRootSpanKind(PRODUCER, CLIENT))
trace(0, 1) {
rabbitSpan(it, 0, null, null, null, "queue.declare")
}
trace(1, 2) {
rabbitSpan(it, 0, "<default>", null, "send", "<default>", true)
rabbitSpan(it, 1, "<default>", null, "process", "<generated>", span(0), null, null, null, false, true)
}
trace(2, 1) {
rabbitSpan(it, 0, null, null, null, "basic.consume")
}
}
}
def rabbitSpan(
TraceAssert trace,
String exchange,
@ -283,12 +323,24 @@ class RabbitMqTest extends AgentInstrumentationSpecification implements WithRabb
Object linkSpan = null,
Throwable exception = null,
String errorMsg = null,
Boolean expectTimestamp = false
boolean expectTimestamp = false
) {
rabbitSpan(trace, 0, exchange, routingKey, operation, resource, parentSpan, linkSpan, exception, errorMsg, expectTimestamp)
}
def rabbitSpan(
TraceAssert trace,
int index,
String exchange,
String routingKey,
String operation,
String resource,
boolean testHeaders
) {
rabbitSpan(trace, index, exchange, routingKey, operation, resource, null, null, null, null, false, testHeaders)
}
def rabbitSpan(
TraceAssert trace,
int index,
String exchange,
@ -299,7 +351,8 @@ class RabbitMqTest extends AgentInstrumentationSpecification implements WithRabb
Object linkSpan = null,
Throwable exception = null,
String errorMsg = null,
Boolean expectTimestamp = false
boolean expectTimestamp = false,
boolean testHeaders = false
) {
def spanName = resource
@ -355,6 +408,9 @@ class RabbitMqTest extends AgentInstrumentationSpecification implements WithRabb
if (expectTimestamp) {
"rabbitmq.record.queue_time_ms" { it instanceof Long && it >= 0 }
}
if (testHeaders) {
"messaging.header.test_message_header" { it == ["test"] }
}
switch (trace.span(index).attributes.get(AttributeKey.stringKey("rabbitmq.command"))) {
case "basic.publish":

View File

@ -7,6 +7,7 @@ package io.opentelemetry.javaagent.instrumentation.rocketmq;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.rocketmq.RocketMqTelemetry;
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig;
import org.apache.rocketmq.client.hook.ConsumeMessageHook;
import org.apache.rocketmq.client.hook.SendMessageHook;
@ -15,6 +16,7 @@ public final class RocketMqClientHooks {
private static final RocketMqTelemetry TELEMETRY =
RocketMqTelemetry.builder(GlobalOpenTelemetry.get())
.setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders())
.setPropagationEnabled(
InstrumentationConfig.get()
.getBoolean("otel.instrumentation.rocketmq-client.propagation", true))

View File

@ -7,6 +7,8 @@ package io.opentelemetry.instrumentation.rocketmq;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.rocketmq.common.message.MessageExt;
@ -75,4 +77,13 @@ enum RocketMqConsumerAttributeGetter implements MessagingAttributesGetter<Messag
public String messageId(MessageExt request, @Nullable Void unused) {
return request.getMsgId();
}
@Override
public List<String> header(MessageExt request, String name) {
String value = request.getProperties().get(name);
if (value != null) {
return Collections.singletonList(value);
}
return Collections.emptyList();
}
}

View File

@ -16,7 +16,9 @@ import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import java.util.List;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.common.message.MessageExt;
@ -26,6 +28,7 @@ class RocketMqInstrumenterFactory {
static Instrumenter<SendMessageContext, Void> createProducerInstrumenter(
OpenTelemetry openTelemetry,
List<String> capturedHeaders,
boolean captureExperimentalSpanAttributes,
boolean propagationEnabled) {
@ -37,7 +40,8 @@ class RocketMqInstrumenterFactory {
openTelemetry,
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractor(MessagingAttributesExtractor.create(getter, operation));
.addAttributesExtractor(
buildMessagingAttributesExtractor(getter, operation, capturedHeaders));
if (captureExperimentalSpanAttributes) {
instrumenterBuilder.addAttributesExtractor(
RocketMqProducerExperimentalAttributeExtractor.INSTANCE);
@ -52,6 +56,7 @@ class RocketMqInstrumenterFactory {
static RocketMqConsumerInstrumenter createConsumerInstrumenter(
OpenTelemetry openTelemetry,
List<String> capturedHeaders,
boolean captureExperimentalSpanAttributes,
boolean propagationEnabled) {
@ -63,14 +68,23 @@ class RocketMqInstrumenterFactory {
return new RocketMqConsumerInstrumenter(
createProcessInstrumenter(
openTelemetry, captureExperimentalSpanAttributes, propagationEnabled, false),
openTelemetry,
capturedHeaders,
captureExperimentalSpanAttributes,
propagationEnabled,
false),
createProcessInstrumenter(
openTelemetry, captureExperimentalSpanAttributes, propagationEnabled, true),
openTelemetry,
capturedHeaders,
captureExperimentalSpanAttributes,
propagationEnabled,
true),
batchReceiveInstrumenterBuilder.buildInstrumenter(SpanKindExtractor.alwaysConsumer()));
}
private static Instrumenter<MessageExt, Void> createProcessInstrumenter(
OpenTelemetry openTelemetry,
List<String> capturedHeaders,
boolean captureExperimentalSpanAttributes,
boolean propagationEnabled,
boolean batch) {
@ -84,7 +98,8 @@ class RocketMqInstrumenterFactory {
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(getter, operation));
builder.addAttributesExtractor(MessagingAttributesExtractor.create(getter, operation));
builder.addAttributesExtractor(
buildMessagingAttributesExtractor(getter, operation, capturedHeaders));
if (captureExperimentalSpanAttributes) {
builder.addAttributesExtractor(RocketMqConsumerExperimentalAttributeExtractor.INSTANCE);
}
@ -107,6 +122,15 @@ class RocketMqInstrumenterFactory {
}
}
private static <T> MessagingAttributesExtractor<T, Void> buildMessagingAttributesExtractor(
MessagingAttributesGetter<T, Void> getter,
MessageOperation operation,
List<String> capturedHeaders) {
return MessagingAttributesExtractor.builder(getter, operation)
.setCapturedHeaders(capturedHeaders)
.build();
}
private static String spanNameOnReceive(Void unused) {
return "multiple_sources receive";
}

View File

@ -7,6 +7,8 @@ package io.opentelemetry.instrumentation.rocketmq;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.producer.SendResult;
@ -80,4 +82,13 @@ enum RocketMqProducerAttributeGetter
SendResult sendResult = request.getSendResult();
return sendResult == null ? null : sendResult.getMsgId();
}
@Override
public List<String> header(SendMessageContext request, String name) {
String value = request.getMessage().getProperties().get(name);
if (value != null) {
return Collections.singletonList(value);
}
return Collections.emptyList();
}
}

View File

@ -7,6 +7,7 @@ package io.opentelemetry.instrumentation.rocketmq;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import java.util.List;
import org.apache.rocketmq.client.hook.ConsumeMessageHook;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.hook.SendMessageHook;
@ -31,14 +32,15 @@ public final class RocketMqTelemetry {
RocketMqTelemetry(
OpenTelemetry openTelemetry,
List<String> capturedHeaders,
boolean captureExperimentalSpanAttributes,
boolean propagationEnabled) {
rocketMqConsumerInstrumenter =
RocketMqInstrumenterFactory.createConsumerInstrumenter(
openTelemetry, captureExperimentalSpanAttributes, propagationEnabled);
openTelemetry, capturedHeaders, captureExperimentalSpanAttributes, propagationEnabled);
rocketMqProducerInstrumenter =
RocketMqInstrumenterFactory.createProducerInstrumenter(
openTelemetry, captureExperimentalSpanAttributes, propagationEnabled);
openTelemetry, capturedHeaders, captureExperimentalSpanAttributes, propagationEnabled);
}
/**

View File

@ -5,13 +5,17 @@
package io.opentelemetry.instrumentation.rocketmq;
import static java.util.Collections.emptyList;
import io.opentelemetry.api.OpenTelemetry;
import java.util.List;
/** A builder of {@link RocketMqTelemetry}. */
public final class RocketMqTelemetryBuilder {
private final OpenTelemetry openTelemetry;
private List<String> capturedHeaders = emptyList();
private boolean captureExperimentalSpanAttributes;
private boolean propagationEnabled = true;
@ -39,12 +43,22 @@ public final class RocketMqTelemetryBuilder {
return this;
}
/**
* Configures the messaging headers that will be captured as span attributes.
*
* @param capturedHeaders A list of messaging header names.
*/
public RocketMqTelemetryBuilder setCapturedHeaders(List<String> capturedHeaders) {
this.capturedHeaders = capturedHeaders;
return this;
}
/**
* Returns a new {@link RocketMqTelemetry} with the settings of this {@link
* RocketMqTelemetryBuilder}.
*/
public RocketMqTelemetry build() {
return new RocketMqTelemetry(
openTelemetry, captureExperimentalSpanAttributes, propagationEnabled);
openTelemetry, capturedHeaders, captureExperimentalSpanAttributes, propagationEnabled);
}
}

View File

@ -10,11 +10,14 @@ import io.opentelemetry.instrumentation.test.LibraryTestTrait
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
import org.apache.rocketmq.client.producer.DefaultMQProducer
import static java.util.Collections.singletonList
class RocketMqClientTest extends AbstractRocketMqClientTest implements LibraryTestTrait {
@Override
void configureMQProducer(DefaultMQProducer producer) {
producer.getDefaultMQProducerImpl().registerSendMessageHook(RocketMqTelemetry.builder(openTelemetry)
.setCapturedHeaders(singletonList("test-message-header"))
.setCaptureExperimentalSpanAttributes(true)
.build().newTracingSendMessageHook())
}
@ -22,6 +25,7 @@ class RocketMqClientTest extends AbstractRocketMqClientTest implements LibraryTe
@Override
void configureMQPushConsumer(DefaultMQPushConsumer consumer) {
consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(RocketMqTelemetry.builder(openTelemetry)
.setCapturedHeaders(singletonList("test-message-header"))
.setCaptureExperimentalSpanAttributes(true)
.build().newTracingConsumeMessageHook())
}

View File

@ -295,4 +295,64 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification {
}
}
}
def "capture message header as span attributes"() {
when:
runWithSpan("parent") {
def msg = new Message(sharedTopic, "TagA", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET))
msg.putUserProperty("test-message-header", "test")
SendResult sendResult = producer.send(msg)
assert sendResult.sendStatus == SendStatus.SEND_OK
}
// waiting longer than assertTraces below does on its own because of CI flakiness
tracingMessageListener.waitForMessages()
then:
assertTraces(1) {
trace(0, 4) {
span(0) {
name "parent"
kind INTERNAL
}
span(1) {
name sharedTopic + " send"
kind PRODUCER
childOf span(0)
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "rocketmq"
"$SemanticAttributes.MESSAGING_DESTINATION" sharedTopic
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
"messaging.rocketmq.tags" "TagA"
"messaging.rocketmq.broker_address" String
"messaging.rocketmq.send_result" "SEND_OK"
"messaging.header.test_message_header" { it == ["test"] }
}
}
span(2) {
name sharedTopic + " process"
kind CONSUMER
childOf span(1)
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "rocketmq"
"$SemanticAttributes.MESSAGING_DESTINATION" sharedTopic
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
"messaging.rocketmq.tags" "TagA"
"messaging.rocketmq.broker_address" String
"messaging.rocketmq.queue_id" Long
"messaging.rocketmq.queue_offset" Long
"messaging.header.test_message_header" { it == ["test"] }
}
}
span(3) {
name "messageListener"
kind INTERNAL
childOf span(2)
}
}
}
}
}

View File

@ -9,6 +9,7 @@ import static java.util.Collections.singletonList;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.spring.integration.SpringIntegrationTelemetry;
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig;
import java.util.List;
import org.springframework.messaging.support.ChannelInterceptor;
@ -23,6 +24,7 @@ public final class SpringIntegrationSingletons {
private static final ChannelInterceptor INTERCEPTOR =
SpringIntegrationTelemetry.builder(GlobalOpenTelemetry.get())
.setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders())
.setProducerSpanEnabled(
InstrumentationConfig.get()
.getBoolean("otel.instrumentation.spring-integration.producer.enabled", false))

View File

@ -5,6 +5,8 @@
package io.opentelemetry.instrumentation.spring.integration;
import static java.util.Collections.emptyList;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
@ -12,6 +14,7 @@ import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import java.util.ArrayList;
import java.util.List;
@ -23,6 +26,7 @@ public final class SpringIntegrationTelemetryBuilder {
private final List<AttributesExtractor<MessageWithChannel, Void>> additionalAttributeExtractors =
new ArrayList<>();
private List<String> capturedHeaders = emptyList();
private boolean producerSpanEnabled = false;
SpringIntegrationTelemetryBuilder(OpenTelemetry openTelemetry) {
@ -39,6 +43,16 @@ public final class SpringIntegrationTelemetryBuilder {
return this;
}
/**
* Configures the messaging headers that will be captured as span attributes.
*
* @param capturedHeaders A list of messaging header names.
*/
public SpringIntegrationTelemetryBuilder setCapturedHeaders(List<String> capturedHeaders) {
this.capturedHeaders = capturedHeaders;
return this;
}
/**
* Sets whether additional {@link SpanKind#PRODUCER PRODUCER} span should be emitted by this
* instrumentation.
@ -68,8 +82,10 @@ public final class SpringIntegrationTelemetryBuilder {
SpringIntegrationTelemetryBuilder::consumerSpanName)
.addAttributesExtractors(additionalAttributeExtractors)
.addAttributesExtractor(
MessagingAttributesExtractor.create(
SpringMessagingAttributesGetter.INSTANCE, MessageOperation.PROCESS))
buildMessagingAttributesExtractor(
SpringMessagingAttributesGetter.INSTANCE,
MessageOperation.PROCESS,
capturedHeaders))
.buildConsumerInstrumenter(MessageHeadersGetter.INSTANCE);
Instrumenter<MessageWithChannel, Void> producerInstrumenter =
@ -79,8 +95,10 @@ public final class SpringIntegrationTelemetryBuilder {
SpringIntegrationTelemetryBuilder::producerSpanName)
.addAttributesExtractors(additionalAttributeExtractors)
.addAttributesExtractor(
MessagingAttributesExtractor.create(
SpringMessagingAttributesGetter.INSTANCE, MessageOperation.SEND))
buildMessagingAttributesExtractor(
SpringMessagingAttributesGetter.INSTANCE,
MessageOperation.SEND,
capturedHeaders))
.buildInstrumenter(SpanKindExtractor.alwaysProducer());
return new SpringIntegrationTelemetry(
openTelemetry.getPropagators(),
@ -88,4 +106,14 @@ public final class SpringIntegrationTelemetryBuilder {
producerInstrumenter,
producerSpanEnabled);
}
private static MessagingAttributesExtractor<MessageWithChannel, Void>
buildMessagingAttributesExtractor(
MessagingAttributesGetter<MessageWithChannel, Void> getter,
MessageOperation operation,
List<String> capturedHeaders) {
return MessagingAttributesExtractor.builder(getter, operation)
.setCapturedHeaders(capturedHeaders)
.build();
}
}

View File

@ -6,6 +6,8 @@
package io.opentelemetry.instrumentation.spring.integration;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
// this class is needed mostly for correct CONSUMER span suppression
@ -77,4 +79,13 @@ enum SpringMessagingAttributesGetter
public String messageId(MessageWithChannel messageWithChannel, @Nullable Void unused) {
return null;
}
@Override
public List<String> header(MessageWithChannel request, String name) {
Object value = request.getMessage().getHeaders().get(name);
if (value != null) {
return Collections.singletonList(value.toString());
}
return Collections.emptyList();
}
}

View File

@ -10,12 +10,17 @@ import org.springframework.context.annotation.Configuration
import org.springframework.integration.config.GlobalChannelInterceptor
import org.springframework.messaging.support.ChannelInterceptor
import static java.util.Collections.singletonList
@Configuration
class GlobalInterceptorSpringConfig {
@GlobalChannelInterceptor
@Bean
ChannelInterceptor otelInterceptor() {
SpringIntegrationTelemetry.create(GlobalOpenTelemetry.get()).newChannelInterceptor()
SpringIntegrationTelemetry.builder(GlobalOpenTelemetry.get())
.setCapturedHeaders(singletonList("test-message-header"))
.build()
.newChannelInterceptor()
}
}

View File

@ -191,6 +191,41 @@ abstract class AbstractSpringIntegrationTracingTest extends InstrumentationSpeci
channel2.unsubscribe(messageHandler)
}
def "capture message header"() {
given:
def channel = applicationContext.getBean("directChannel", SubscribableChannel)
def messageHandler = new CapturingMessageHandler()
channel.subscribe(messageHandler)
when:
channel.send(MessageBuilder.withPayload("test")
.setHeader("test-message-header", "test")
.build())
then:
def capturedMessage = messageHandler.join()
assertTraces(1) {
trace(0, 2) {
span(0) {
name "application.directChannel process"
kind CONSUMER
}
span(1) {
name "handler"
childOf span(0)
}
def interceptorSpan = span(0)
verifyCorrectSpanWasPropagated(capturedMessage, interceptorSpan)
}
}
cleanup:
channel.unsubscribe(messageHandler)
}
static void verifyCorrectSpanWasPropagated(Message<?> capturedMessage, SpanData parentSpan) {
def propagatedSpan = capturedMessage.headers.get("traceparent") as String
assert propagatedSpan.contains(parentSpan.traceId), "wrong trace id"

View File

@ -10,6 +10,7 @@ import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
import io.opentelemetry.javaagent.instrumentation.jms.JmsMessageAttributesGetter;
import io.opentelemetry.javaagent.instrumentation.jms.MessagePropertyGetter;
import io.opentelemetry.javaagent.instrumentation.jms.MessageWithDestination;
@ -28,7 +29,10 @@ public final class SpringJmsSingletons {
GlobalOpenTelemetry.get(),
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractor(MessagingAttributesExtractor.create(getter, operation))
.addAttributesExtractor(
MessagingAttributesExtractor.builder(getter, operation)
.setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders())
.build())
.buildConsumerInstrumenter(MessagePropertyGetter.INSTANCE);
}

View File

@ -46,7 +46,7 @@ class SpringListenerTest extends AgentInstrumentationSpecification {
config << [AnnotatedListenerConfig, ManualListenerConfig]
}
static producerSpan(TraceAssert trace, int index, String destinationType, String destinationName) {
static producerSpan(TraceAssert trace, int index, String destinationType, String destinationName, boolean testHeaders = false) {
trace.span(index) {
name destinationName + " send"
kind PRODUCER
@ -59,6 +59,10 @@ class SpringListenerTest extends AgentInstrumentationSpecification {
"$SemanticAttributes.MESSAGING_TEMP_DESTINATION" true
}
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
if (testHeaders) {
"messaging.header.test_message_header" { it == ["test"] }
"messaging.header.test_message_int_header" { it == ["1234"] }
}
}
}
}
@ -66,7 +70,7 @@ class SpringListenerTest extends AgentInstrumentationSpecification {
// passing messageId = null will verify message.id is not captured,
// passing messageId = "" will verify message.id is captured (but won't verify anything about the value),
// any other value for messageId will verify that message.id is captured and has that same value
static consumerSpan(TraceAssert trace, int index, String destinationType, String destinationName, String messageId, Object parentOrLinkedSpan, String operation) {
static consumerSpan(TraceAssert trace, int index, String destinationType, String destinationName, String messageId, Object parentOrLinkedSpan, String operation, boolean testHeaders = false) {
trace.span(index) {
name destinationName + " " + operation
kind CONSUMER
@ -87,6 +91,10 @@ class SpringListenerTest extends AgentInstrumentationSpecification {
if (destinationName == "(temporary)") {
"$SemanticAttributes.MESSAGING_TEMP_DESTINATION" true
}
if (testHeaders) {
"messaging.header.test_message_header" { it == ["test"] }
"messaging.header.test_message_int_header" { it == ["1234"] }
}
}
}
}

View File

@ -5,6 +5,8 @@
import com.google.common.io.Files
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import javax.jms.JMSException
import javax.jms.Message
import org.hornetq.api.core.TransportConfiguration
import org.hornetq.api.core.client.HornetQClient
import org.hornetq.api.jms.HornetQJMSClient
@ -17,6 +19,7 @@ import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory
import org.hornetq.core.server.HornetQServer
import org.hornetq.core.server.HornetQServers
import org.springframework.jms.core.JmsTemplate
import org.springframework.jms.core.MessagePostProcessor
import spock.lang.Shared
import javax.jms.Connection
@ -148,4 +151,32 @@ class SpringTemplateTest extends AgentInstrumentationSpecification {
destination | destinationType | destinationName
session.createQueue("SpringTemplateJms2") | "queue" | "SpringTemplateJms2"
}
def "capture message header as span attribute"() {
setup:
template.convertAndSend(destination, messageText, new MessagePostProcessor() {
@Override
Message postProcessMessage(Message message) throws JMSException {
message.setStringProperty("test_message_header", "test")
message.setIntProperty("test_message_int_header", 1234)
return message
}
})
TextMessage receivedMessage = template.receive(destination)
expect:
receivedMessage.text == messageText
assertTraces(2) {
trace(0, 1) {
producerSpan(it, 0, destinationType, destinationName, true)
}
trace(1, 1) {
consumerSpan(it, 0, destinationType, destinationName, receivedMessage.getJMSMessageID(), null, "receive", true)
}
}
where:
destination | destinationType | destinationName
session.createQueue("SpringTemplateJms2") | "queue" | "SpringTemplateJms2"
}
}

View File

@ -5,8 +5,11 @@
package io.opentelemetry.instrumentation.spring.kafka.v2_7;
import static java.util.Collections.emptyList;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.kafka.internal.KafkaInstrumenterFactory;
import java.util.List;
/** A builder of {@link SpringKafkaTelemetry}. */
public final class SpringKafkaTelemetryBuilder {
@ -14,6 +17,7 @@ public final class SpringKafkaTelemetryBuilder {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.spring-kafka-2.7";
private final OpenTelemetry openTelemetry;
private List<String> capturedHeaders = emptyList();
private boolean captureExperimentalSpanAttributes = false;
private boolean propagationEnabled = true;
private boolean messagingReceiveInstrumentationEnabled = false;
@ -22,6 +26,11 @@ public final class SpringKafkaTelemetryBuilder {
this.openTelemetry = openTelemetry;
}
public SpringKafkaTelemetryBuilder setCapturedHeaders(List<String> capturedHeaders) {
this.capturedHeaders = capturedHeaders;
return this;
}
public SpringKafkaTelemetryBuilder setCaptureExperimentalSpanAttributes(
boolean captureExperimentalSpanAttributes) {
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
@ -46,6 +55,7 @@ public final class SpringKafkaTelemetryBuilder {
public SpringKafkaTelemetry build() {
KafkaInstrumenterFactory factory =
new KafkaInstrumenterFactory(openTelemetry, INSTRUMENTATION_NAME)
.setCapturedHeaders(capturedHeaders)
.setCaptureExperimentalSpanAttributes(captureExperimentalSpanAttributes)
.setPropagationEnabled(propagationEnabled)
.setMessagingReceiveInstrumentationEnabled(messagingReceiveInstrumentationEnabled)

View File

@ -6,6 +6,8 @@
package io.opentelemetry.javaagent.instrumentation.spring.rabbit;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
import org.springframework.amqp.core.Message;
@ -73,4 +75,13 @@ enum SpringRabbitMessageAttributesGetter implements MessagingAttributesGetter<Me
public String messageId(Message message, @Nullable Void unused) {
return message.getMessageProperties().getMessageId();
}
@Override
public List<String> header(Message message, String name) {
Object value = message.getMessageProperties().getHeaders().get(name);
if (value != null) {
return Collections.singletonList(value.toString());
}
return Collections.emptyList();
}
}

View File

@ -10,6 +10,7 @@ import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
import org.springframework.amqp.core.Message;
public final class SpringRabbitSingletons {
@ -27,7 +28,10 @@ public final class SpringRabbitSingletons {
GlobalOpenTelemetry.get(),
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractor(MessagingAttributesExtractor.create(getter, operation))
.addAttributesExtractor(
MessagingAttributesExtractor.builder(getter, operation)
.setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders())
.build())
.buildConsumerInstrumenter(MessageHeaderGetter.INSTANCE);
}

View File

@ -7,7 +7,10 @@ import com.rabbitmq.client.ConnectionFactory
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.instrumentation.testing.GlobalTraceUtil
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import org.springframework.amqp.AmqpException
import org.springframework.amqp.core.AmqpTemplate
import org.springframework.amqp.core.Message
import org.springframework.amqp.core.MessagePostProcessor
import org.springframework.amqp.core.Queue
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.boot.SpringApplication
@ -20,6 +23,7 @@ import org.testcontainers.containers.wait.strategy.Wait
import spock.lang.Shared
import java.time.Duration
import spock.lang.Unroll
import static com.google.common.net.InetAddresses.isInetAddress
import static io.opentelemetry.api.trace.SpanKind.CLIENT
@ -62,15 +66,27 @@ class ContextPropagationTest extends AgentInstrumentationSpecification {
applicationContext?.close()
}
def "should propagate context to consumer"() {
@Unroll
def "should propagate context to consumer, test headers: #testHeaders"() {
given:
def connection = connectionFactory.newConnection()
def channel = connection.createChannel()
when:
runWithSpan("parent") {
applicationContext.getBean(AmqpTemplate)
.convertAndSend(ConsumerConfig.TEST_QUEUE, "test")
if (testHeaders) {
applicationContext.getBean(AmqpTemplate)
.convertAndSend(ConsumerConfig.TEST_QUEUE, (Object) "test", new MessagePostProcessor() {
@Override
Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setHeader("test-message-header", "test")
return message
}
})
} else {
applicationContext.getBean(AmqpTemplate)
.convertAndSend(ConsumerConfig.TEST_QUEUE, "test")
}
}
then:
@ -94,6 +110,9 @@ class ContextPropagationTest extends AgentInstrumentationSpecification {
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "queue"
"$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
"$SemanticAttributes.MESSAGING_RABBITMQ_ROUTING_KEY" String
if (testHeaders) {
"messaging.header.test_message_header" { it == ["test"] }
}
}
}
// spring-cloud-stream-binder-rabbit listener puts all messages into a BlockingQueue immediately after receiving
@ -110,6 +129,9 @@ class ContextPropagationTest extends AgentInstrumentationSpecification {
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
"$SemanticAttributes.MESSAGING_RABBITMQ_ROUTING_KEY" String
if (testHeaders) {
"messaging.header.test_message_header" { it == ["test"] }
}
}
}
span(3) {
@ -123,6 +145,9 @@ class ContextPropagationTest extends AgentInstrumentationSpecification {
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "queue"
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
if (testHeaders) {
"messaging.header.test_message_header" { it == ["test"] }
}
}
}
span(4) {
@ -150,6 +175,9 @@ class ContextPropagationTest extends AgentInstrumentationSpecification {
cleanup:
channel?.close()
connection?.close()
where:
testHeaders << [false, true]
}
@SpringBootConfiguration

View File

@ -23,6 +23,7 @@ public final class VertxKafkaSingletons {
static {
KafkaInstrumenterFactory factory =
new KafkaInstrumenterFactory(GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME)
.setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders())
.setCaptureExperimentalSpanAttributes(
InstrumentationConfig.get()
.getBoolean("otel.instrumentation.kafka.experimental-span-attributes", false))

View File

@ -5,6 +5,10 @@
package io.opentelemetry.javaagent.bootstrap.internal;
import static java.util.Collections.emptyList;
import java.util.List;
/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
* any time.
@ -15,6 +19,7 @@ public final class ExperimentalConfig {
new ExperimentalConfig(InstrumentationConfig.get());
private final InstrumentationConfig config;
private final List<String> messagingHeaders;
/** Returns the global agent configuration. */
public static ExperimentalConfig get() {
@ -23,6 +28,8 @@ public final class ExperimentalConfig {
public ExperimentalConfig(InstrumentationConfig config) {
this.config = config;
messagingHeaders =
config.getList("otel.instrumentation.messaging.experimental.capture-headers", emptyList());
}
public boolean controllerTelemetryEnabled() {
@ -64,4 +71,8 @@ public final class ExperimentalConfig {
"otel.instrumentation.messaging.experimental.receive-telemetry.enabled",
!receiveSpansSuppressed);
}
public List<String> getMessagingHeaders() {
return messagingHeaders;
}
}

View File

@ -0,0 +1,27 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.testing.messaging;
import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.config.ConfigPropertySource;
import java.util.HashMap;
import java.util.Map;
@AutoService(ConfigPropertySource.class)
public class CapturedMessagingHeadersTestConfigSource implements ConfigPropertySource {
@Override
public Map<String, String> getProperties() {
Map<String, String> testConfig = new HashMap<>();
testConfig.put(
"otel.instrumentation.messaging.experimental.capture-headers",
// most tests use "test-message-header", "test_message_header" is used for JMS2 because
// '-' is not allowed in a JMS property name. JMS property name should be a valid java
// identifier.
"test-message-header, test-message-int-header, test_message_header, test_message_int_header");
return testConfig;
}
}