Link JMS receive span with the producer span (#6804)

Resolves #6779

In JMS you can have either the consumer receive span or the consumer
process span (unlike Kafka, where the process span is always there and
the receive span is just an addition) - in scenarios where polling
(receive) is used, I think it makes sense to add links to the producer
span to preserve the producer-consumer connection. Current messaging
semantic conventions don't really describe a situation like this one,
but the https://github.com/open-telemetry/oteps/pull/220 OTEP mentions
that links might be used in a scenario like this one - which makes me
think that adding links here might be a not that bad idea.
This commit is contained in:
Mateusz Rzeszutek 2022-10-05 18:11:28 +02:00 committed by GitHub
parent da3eecfd9f
commit f195ec0624
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 150 additions and 71 deletions

View File

@ -3,7 +3,6 @@
* SPDX-License-Identifier: Apache-2.0
*/
import com.google.common.io.Files
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.instrumentation.test.asserts.TraceAssert
import io.opentelemetry.sdk.trace.data.SpanData
@ -26,6 +25,7 @@ import javax.jms.Message
import javax.jms.MessageListener
import javax.jms.Session
import javax.jms.TextMessage
import java.nio.file.Files
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicReference
@ -43,7 +43,7 @@ class Jms2Test extends AgentInstrumentationSpecification {
HornetQTextMessage message = session.createTextMessage(messageText)
def setupSpec() {
def tempDir = Files.createTempDir()
def tempDir = Files.createTempDirectory("jmsTempDir").toFile()
tempDir.deleteOnExit()
Configuration config = new ConfigurationImpl()
@ -86,19 +86,34 @@ class Jms2Test extends AgentInstrumentationSpecification {
def producer = session.createProducer(destination)
def consumer = session.createConsumer(destination)
producer.send(message)
runWithSpan("producer parent") {
producer.send(message)
}
TextMessage receivedMessage = consumer.receive()
TextMessage receivedMessage = runWithSpan("consumer parent") {
return consumer.receive() as TextMessage
}
String messageId = receivedMessage.getJMSMessageID()
expect:
receivedMessage.text == messageText
assertTraces(2) {
trace(0, 1) {
producerSpan(it, 0, destinationType, destinationName)
SpanData producerSpanData
trace(0, 2) {
span(0) {
name "producer parent"
hasNoParent()
}
producerSpan(it, 1, destinationType, destinationName, span(0))
producerSpanData = span(1)
}
trace(1, 1) {
consumerSpan(it, 0, destinationType, destinationName, messageId, null, "receive")
trace(1, 2) {
span(0) {
name "consumer parent"
hasNoParent()
}
consumerSpan(it, 1, destinationType, destinationName, messageId, "receive", span(0), producerSpanData)
}
}
@ -124,18 +139,24 @@ class Jms2Test extends AgentInstrumentationSpecification {
@Override
void onMessage(Message message) {
lock.await() // ensure the producer trace is reported first.
messageRef.set(message)
messageRef.set(message as TextMessage)
}
}
producer.send(message)
runWithSpan("parent") {
producer.send(message)
}
lock.countDown()
expect:
assertTraces(1) {
trace(0, 2) {
producerSpan(it, 0, destinationType, destinationName)
consumerSpan(it, 1, destinationType, destinationName, messageRef.get().getJMSMessageID(), span(0), "process")
trace(0, 3) {
span(0) {
name "parent"
hasNoParent()
}
producerSpan(it, 1, destinationType, destinationName, span(0))
consumerSpan(it, 2, destinationType, destinationName, messageRef.get().getJMSMessageID(), "process", span(1))
}
}
// This check needs to go after all traces have been accounted for
@ -158,7 +179,7 @@ class Jms2Test extends AgentInstrumentationSpecification {
def consumer = session.createConsumer(destination)
// Receive with timeout
TextMessage receivedMessage = consumer.receiveNoWait()
Message receivedMessage = consumer.receiveNoWait()
expect:
receivedMessage == null
@ -179,7 +200,7 @@ class Jms2Test extends AgentInstrumentationSpecification {
def consumer = session.createConsumer(destination)
// Receive with timeout
TextMessage receivedMessage = consumer.receive(100)
Message receivedMessage = consumer.receive(100)
expect:
receivedMessage == null
@ -206,19 +227,25 @@ class Jms2Test extends AgentInstrumentationSpecification {
@Override
void onMessage(Message message) {
lock.await() // ensure the producer trace is reported first.
messageRef.set(message)
messageRef.set(message as TextMessage)
}
}
when:
producer.send(destination, message)
runWithSpan("parent") {
producer.send(destination, message)
}
lock.countDown()
then:
assertTraces(1) {
trace(0, 2) {
producerSpan(it, 0, destinationType, destinationName)
consumerSpan(it, 1, destinationType, destinationName, messageRef.get().getJMSMessageID(), span(0), "process")
trace(0, 3) {
span(0) {
name "parent"
hasNoParent()
}
producerSpan(it, 1, destinationType, destinationName, span(0))
consumerSpan(it, 2, destinationType, destinationName, messageRef.get().getJMSMessageID(), "process", span(1))
}
}
// This check needs to go after all traces have been accounted for
@ -236,11 +263,15 @@ class Jms2Test extends AgentInstrumentationSpecification {
session.createTemporaryTopic() | "topic" | "(temporary)"
}
static producerSpan(TraceAssert trace, int index, String destinationType, String destinationName) {
static producerSpan(TraceAssert trace, int index, String destinationType, String destinationName, SpanData parentSpan = null) {
trace.span(index) {
name destinationName + " send"
kind PRODUCER
hasNoParent()
if (parentSpan == null) {
hasNoParent()
} else {
childOf(parentSpan)
}
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "jms"
"$SemanticAttributes.MESSAGING_DESTINATION" destinationName
@ -256,14 +287,19 @@ class Jms2Test extends AgentInstrumentationSpecification {
// passing messageId = null will verify message.id is not captured,
// passing messageId = "" will verify message.id is captured (but won't verify anything about the value),
// any other value for messageId will verify that message.id is captured and has that same value
static consumerSpan(TraceAssert trace, int index, String destinationType, String destinationName, String messageId, Object parentOrLinkedSpan, String operation) {
static consumerSpan(TraceAssert trace, int index, String destinationType, String destinationName, String messageId, String operation, SpanData parentSpan, SpanData linkedSpan = null) {
trace.span(index) {
name destinationName + " " + operation
kind CONSUMER
if (parentOrLinkedSpan != null) {
childOf((SpanData) parentOrLinkedSpan)
} else {
if (parentSpan == null) {
hasNoParent()
} else {
childOf(parentSpan)
}
if (linkedSpan == null) {
hasNoLinks()
} else {
hasLink(linkedSpan)
}
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "jms"

View File

@ -10,6 +10,7 @@ import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.
import static io.opentelemetry.javaagent.instrumentation.jms.JmsSingletons.consumerInstrumenter;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import io.opentelemetry.context.Context;
@ -37,10 +38,16 @@ public class JmsMessageConsumerInstrumentation implements TypeInstrumentation {
@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
named("receive").and(takesArguments(0).or(takesArguments(1))).and(isPublic()),
named("receive")
.and(takesArguments(0).or(takesArguments(1)))
.and(returns(named("javax.jms.Message")))
.and(isPublic()),
JmsMessageConsumerInstrumentation.class.getName() + "$ConsumerAdvice");
transformer.applyAdviceToMethod(
named("receiveNoWait").and(takesArguments(0)).and(isPublic()),
named("receiveNoWait")
.and(takesArguments(0))
.and(returns(named("javax.jms.Message")))
.and(isPublic()),
JmsMessageConsumerInstrumentation.class.getName() + "$ConsumerAdvice");
}

View File

@ -12,6 +12,7 @@ import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperat
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import io.opentelemetry.instrumentation.api.internal.PropagatorBasedSpanLinksExtractor;
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
public final class JmsSingletons {
@ -47,6 +48,10 @@ public final class JmsSingletons {
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractor(buildMessagingAttributesExtractor(getter, operation))
.setEnabled(ExperimentalConfig.get().messagingReceiveInstrumentationEnabled())
.addSpanLinksExtractor(
new PropagatorBasedSpanLinksExtractor<>(
GlobalOpenTelemetry.getPropagators().getTextMapPropagator(),
MessagePropertyGetter.INSTANCE))
.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
}

View File

@ -61,19 +61,34 @@ class Jms1Test extends AgentInstrumentationSpecification {
def producer = session.createProducer(destination)
def consumer = session.createConsumer(destination)
producer.send(message)
runWithSpan("producer parent") {
producer.send(message)
}
TextMessage receivedMessage = consumer.receive()
TextMessage receivedMessage = runWithSpan("consumer parent") {
return consumer.receive() as TextMessage
}
String messageId = receivedMessage.getJMSMessageID()
expect:
receivedMessage.text == messageText
assertTraces(2) {
trace(0, 1) {
producerSpan(it, 0, destinationType, destinationName)
SpanData producerSpanData
trace(0, 2) {
span(0) {
name "producer parent"
hasNoParent()
}
producerSpan(it, 1, destinationType, destinationName, span(0))
producerSpanData = span(1)
}
trace(1, 1) {
consumerSpan(it, 0, destinationType, destinationName, messageId, null, "receive")
trace(1, 2) {
span(0) {
name "consumer parent"
hasNoParent()
}
consumerSpan(it, 1, destinationType, destinationName, messageId, "receive", span(0), producerSpanData)
}
}
@ -99,7 +114,7 @@ class Jms1Test extends AgentInstrumentationSpecification {
@Override
void onMessage(Message message) {
lock.await() // ensure the producer trace is reported first.
messageRef.set(message)
messageRef.set(message as TextMessage)
}
}
@ -110,7 +125,7 @@ class Jms1Test extends AgentInstrumentationSpecification {
assertTraces(1) {
trace(0, 2) {
producerSpan(it, 0, destinationType, destinationName)
consumerSpan(it, 1, destinationType, destinationName, messageRef.get().getJMSMessageID(), span(0), "process")
consumerSpan(it, 1, destinationType, destinationName, messageRef.get().getJMSMessageID(), "process", span(0))
}
}
// This check needs to go after all traces have been accounted for
@ -133,7 +148,7 @@ class Jms1Test extends AgentInstrumentationSpecification {
def consumer = session.createConsumer(destination)
// Receive with timeout
TextMessage receivedMessage = consumer.receiveNoWait()
Message receivedMessage = consumer.receiveNoWait()
expect:
receivedMessage == null
@ -154,7 +169,7 @@ class Jms1Test extends AgentInstrumentationSpecification {
def consumer = session.createConsumer(destination)
// Receive with timeout
TextMessage receivedMessage = consumer.receive(100)
Message receivedMessage = consumer.receive(100)
expect:
receivedMessage == null
@ -183,7 +198,7 @@ class Jms1Test extends AgentInstrumentationSpecification {
and:
producer.send(message)
TextMessage receivedMessage = consumer.receive()
TextMessage receivedMessage = consumer.receive() as TextMessage
then:
receivedMessage.text == messageText
@ -196,21 +211,7 @@ class Jms1Test extends AgentInstrumentationSpecification {
producerSpan(it, 0, destinationType, destinationName)
}
trace(1, 1) {
span(0) {
hasNoParent()
name destinationName + " receive"
kind CONSUMER
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "jms"
"$SemanticAttributes.MESSAGING_DESTINATION" destinationName
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" destinationType
"$SemanticAttributes.MESSAGING_MESSAGE_ID" receivedMessage.getJMSMessageID()
"$SemanticAttributes.MESSAGING_OPERATION" "receive"
if (destinationName == "(temporary)") {
"$SemanticAttributes.MESSAGING_TEMP_DESTINATION" true
}
}
}
consumerSpan(it, 0, destinationType, destinationName, "", "receive", null)
}
}
@ -237,19 +238,25 @@ class Jms1Test extends AgentInstrumentationSpecification {
@Override
void onMessage(Message message) {
lock.await() // ensure the producer trace is reported first.
messageRef.set(message)
messageRef.set(message as TextMessage)
}
}
when:
producer.send(destination, message)
runWithSpan("parent") {
producer.send(destination, message)
}
lock.countDown()
then:
assertTraces(1) {
trace(0, 2) {
producerSpan(it, 0, destinationType, destinationName)
consumerSpan(it, 1, destinationType, destinationName, messageRef.get().getJMSMessageID(), span(0), "process")
trace(0, 3) {
span(0) {
name "parent"
hasNoParent()
}
producerSpan(it, 1, destinationType, destinationName, span(0))
consumerSpan(it, 2, destinationType, destinationName, messageRef.get().getJMSMessageID(), "process", span(1))
}
}
// This check needs to go after all traces have been accounted for
@ -278,19 +285,34 @@ class Jms1Test extends AgentInstrumentationSpecification {
def message = session.createTextMessage(messageText)
message.setStringProperty("test-message-header", "test")
message.setIntProperty("test-message-int-header", 1234)
producer.send(message)
runWithSpan("producer parent") {
producer.send(message)
}
TextMessage receivedMessage = consumer.receive()
TextMessage receivedMessage = runWithSpan("consumer parent") {
return consumer.receive() as TextMessage
}
String messageId = receivedMessage.getJMSMessageID()
expect:
receivedMessage.text == messageText
assertTraces(2) {
trace(0, 1) {
producerSpan(it, 0, destinationType, destinationName, true)
SpanData producerSpanData
trace(0, 2) {
span(0) {
name "producer parent"
hasNoParent()
}
producerSpan(it, 1, destinationType, destinationName, span(0), true)
producerSpanData = span(1)
}
trace(1, 1) {
consumerSpan(it, 0, destinationType, destinationName, messageId, null, "receive", true)
trace(1, 2) {
span(0) {
name "consumer parent"
hasNoParent()
}
consumerSpan(it, 1, destinationType, destinationName, messageId, "receive", span(0), producerSpanData, true)
}
}
@ -299,11 +321,15 @@ class Jms1Test extends AgentInstrumentationSpecification {
consumer.close()
}
static producerSpan(TraceAssert trace, int index, String destinationType, String destinationName, boolean testHeaders = false) {
static producerSpan(TraceAssert trace, int index, String destinationType, String destinationName, SpanData parentSpan = null, boolean testHeaders = false) {
trace.span(index) {
name destinationName + " send"
kind PRODUCER
hasNoParent()
if (parentSpan == null) {
hasNoParent()
} else {
childOf(parentSpan)
}
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "jms"
"$SemanticAttributes.MESSAGING_DESTINATION" destinationName
@ -323,14 +349,19 @@ class Jms1Test extends AgentInstrumentationSpecification {
// passing messageId = null will verify message.id is not captured,
// passing messageId = "" will verify message.id is captured (but won't verify anything about the value),
// any other value for messageId will verify that message.id is captured and has that same value
static consumerSpan(TraceAssert trace, int index, String destinationType, String destinationName, String messageId, Object parentOrLinkedSpan, String operation, boolean testHeaders = false) {
static consumerSpan(TraceAssert trace, int index, String destinationType, String destinationName, String messageId, String operation, SpanData parentSpan, SpanData linkedSpan = null, boolean testHeaders = false) {
trace.span(index) {
name destinationName + " " + operation
kind CONSUMER
if (parentOrLinkedSpan != null) {
childOf((SpanData) parentOrLinkedSpan)
} else {
if (parentSpan == null) {
hasNoParent()
} else {
childOf(parentSpan)
}
if (linkedSpan == null) {
hasNoLinks()
} else {
hasLink(linkedSpan)
}
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "jms"