Merge pull request #360 from DataDog/mar-kolya/improve-jms1-receive-handling
Improve jms1/jms2 receive handling
This commit is contained in:
commit
1f5ece8f5a
|
|
@ -22,9 +22,11 @@ import datadog.trace.instrumentation.jms.util.MessagePropertyTextMap;
|
|||
import io.opentracing.Scope;
|
||||
import io.opentracing.Span;
|
||||
import io.opentracing.SpanContext;
|
||||
import io.opentracing.Tracer;
|
||||
import io.opentracing.propagation.Format;
|
||||
import io.opentracing.tag.Tags;
|
||||
import io.opentracing.util.GlobalTracer;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import javax.jms.Message;
|
||||
|
|
@ -54,7 +56,7 @@ public final class JMS1MessageConsumerInstrumentation extends Instrumenter.Confi
|
|||
.transform(
|
||||
DDAdvice.create()
|
||||
.advice(
|
||||
named("receive").and(takesArguments(0)).and(isPublic()),
|
||||
named("receive").and(takesArguments(0).or(takesArguments(1))).and(isPublic()),
|
||||
ConsumerAdvice.class.getName())
|
||||
.advice(
|
||||
named("receiveNoWait").and(takesArguments(0)).and(isPublic()),
|
||||
|
|
@ -73,30 +75,42 @@ public final class JMS1MessageConsumerInstrumentation extends Instrumenter.Confi
|
|||
public static void stopSpan(
|
||||
@Advice.This final MessageConsumer consumer,
|
||||
@Advice.Enter final long startTime,
|
||||
@Advice.Origin final Method method,
|
||||
@Advice.Return final Message message,
|
||||
@Advice.Thrown final Throwable throwable) {
|
||||
|
||||
final SpanContext extractedContext =
|
||||
GlobalTracer.get().extract(Format.Builtin.TEXT_MAP, new MessagePropertyTextMap(message));
|
||||
|
||||
final Scope scope =
|
||||
Tracer.SpanBuilder spanBuilder =
|
||||
GlobalTracer.get()
|
||||
.buildSpan("jms.consume")
|
||||
.asChildOf(extractedContext)
|
||||
.withTag(DDTags.SERVICE_NAME, "jms")
|
||||
.withTag(DDTags.SPAN_TYPE, DDSpanTypes.MESSAGE_CONSUMER)
|
||||
.withTag(Tags.COMPONENT.getKey(), "jms1")
|
||||
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER)
|
||||
.withTag("span.origin.type", consumer.getClass().getName())
|
||||
.withStartTimestamp(TimeUnit.MILLISECONDS.toMicros(startTime))
|
||||
.startActive(true);
|
||||
.withStartTimestamp(TimeUnit.MILLISECONDS.toMicros(startTime));
|
||||
|
||||
if (message == null) {
|
||||
spanBuilder = spanBuilder.withTag(DDTags.RESOURCE_NAME, "JMS " + method.getName());
|
||||
} else {
|
||||
spanBuilder =
|
||||
spanBuilder.withTag(
|
||||
DDTags.RESOURCE_NAME, "Consumed from " + toResourceName(message, null));
|
||||
|
||||
final SpanContext extractedContext =
|
||||
GlobalTracer.get()
|
||||
.extract(Format.Builtin.TEXT_MAP, new MessagePropertyTextMap(message));
|
||||
if (extractedContext != null) {
|
||||
spanBuilder = spanBuilder.asChildOf(extractedContext);
|
||||
}
|
||||
}
|
||||
|
||||
final Scope scope = spanBuilder.startActive(true);
|
||||
final Span span = scope.span();
|
||||
|
||||
if (throwable != null) {
|
||||
Tags.ERROR.set(span, Boolean.TRUE);
|
||||
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
|
||||
}
|
||||
span.setTag(DDTags.RESOURCE_NAME, "Consumed from " + toResourceName(message, null));
|
||||
|
||||
scope.close();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,8 @@
|
|||
import datadog.trace.agent.test.AgentTestRunner
|
||||
import datadog.trace.agent.test.ListWriterAssert
|
||||
import datadog.trace.api.DDSpanTypes
|
||||
import datadog.trace.api.DDTags
|
||||
import io.opentracing.tag.Tags
|
||||
import org.apache.activemq.ActiveMQConnectionFactory
|
||||
import org.apache.activemq.ActiveMQMessageConsumer
|
||||
import org.apache.activemq.ActiveMQMessageProducer
|
||||
|
|
@ -14,9 +17,15 @@ import javax.jms.TextMessage
|
|||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
|
||||
import static datadog.trace.agent.test.ListWriterAssert.assertTraces
|
||||
|
||||
class JMS1Test extends AgentTestRunner {
|
||||
@Shared
|
||||
static Session session
|
||||
String messageText = "a message"
|
||||
@Shared
|
||||
Session session
|
||||
|
||||
def message = session.createTextMessage(messageText)
|
||||
|
||||
def setupSpec() {
|
||||
EmbeddedActiveMQBroker broker = new EmbeddedActiveMQBroker()
|
||||
|
|
@ -28,124 +37,52 @@ class JMS1Test extends AgentTestRunner {
|
|||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
|
||||
}
|
||||
|
||||
def "sending a message to #resourceName generates spans"() {
|
||||
def "sending a message to #jmsResourceName generates spans"() {
|
||||
setup:
|
||||
def producer = session.createProducer(destination)
|
||||
def consumer = session.createConsumer(destination)
|
||||
def message = session.createTextMessage("a message")
|
||||
|
||||
producer.send(message)
|
||||
|
||||
TextMessage receivedMessage = consumer.receive()
|
||||
|
||||
expect:
|
||||
receivedMessage.text == "a message"
|
||||
TEST_WRITER.size() == 2
|
||||
receivedMessage.text == messageText
|
||||
assertTraces(TEST_WRITER, 2) {
|
||||
producerTrace(it, 0, jmsResourceName)
|
||||
trace(1, 1) { // Consumer trace
|
||||
span(0) {
|
||||
childOf TEST_WRITER.firstTrace().get(2)
|
||||
serviceName "jms"
|
||||
operationName "jms.consume"
|
||||
resourceName "Consumed from $jmsResourceName"
|
||||
spanType DDSpanTypes.MESSAGE_PRODUCER
|
||||
errored false
|
||||
|
||||
and: // producer trace
|
||||
def trace = TEST_WRITER.firstTrace()
|
||||
trace.size() == 3
|
||||
|
||||
and: // span 0
|
||||
def span0 = trace[0]
|
||||
|
||||
span0.context().operationName == "jms.produce"
|
||||
span0.serviceName == "jms"
|
||||
span0.resourceName == "Produced for $resourceName"
|
||||
span0.type == DDSpanTypes.MESSAGE_PRODUCER
|
||||
!span0.context().getErrorFlag()
|
||||
span0.context().parentId == 0
|
||||
|
||||
|
||||
def tags0 = span0.context().tags
|
||||
tags0["span.kind"] == "producer"
|
||||
tags0["component"] == "jms1"
|
||||
|
||||
tags0["span.origin.type"] == ActiveMQMessageProducer.name
|
||||
|
||||
tags0["thread.name"] != null
|
||||
tags0["thread.id"] != null
|
||||
tags0.size() == 6
|
||||
|
||||
and: // span 1
|
||||
def span1 = trace[1]
|
||||
|
||||
span1.context().operationName == "jms.produce"
|
||||
span1.serviceName == "jms"
|
||||
span1.resourceName == "Produced for $resourceName"
|
||||
span1.type == DDSpanTypes.MESSAGE_PRODUCER
|
||||
!span1.context().getErrorFlag()
|
||||
span1.context().parentId == span0.context().spanId
|
||||
|
||||
|
||||
def tags1 = span1.context().tags
|
||||
tags1["span.kind"] == "producer"
|
||||
tags1["component"] == "jms1"
|
||||
|
||||
tags1["span.origin.type"] == ActiveMQMessageProducer.name
|
||||
|
||||
tags1["thread.name"] != null
|
||||
tags1["thread.id"] != null
|
||||
tags1.size() == 6
|
||||
|
||||
and: // span 2
|
||||
def span2 = trace[2]
|
||||
|
||||
span2.context().operationName == "jms.produce"
|
||||
span2.serviceName == "jms"
|
||||
span2.resourceName == "Produced for $resourceName"
|
||||
span2.type == DDSpanTypes.MESSAGE_PRODUCER
|
||||
!span2.context().getErrorFlag()
|
||||
span2.context().parentId == span1.context().spanId
|
||||
|
||||
|
||||
def tags2 = span2.context().tags
|
||||
tags2["span.kind"] == "producer"
|
||||
tags2["component"] == "jms1"
|
||||
|
||||
tags2["span.origin.type"] == ActiveMQMessageProducer.name
|
||||
|
||||
tags2["thread.name"] != null
|
||||
tags2["thread.id"] != null
|
||||
tags2.size() == 6
|
||||
|
||||
and: // consumer trace
|
||||
def consumerTrace = TEST_WRITER.get(1)
|
||||
consumerTrace.size() == 1
|
||||
|
||||
def consumerSpan = consumerTrace[0]
|
||||
|
||||
consumerSpan.context().operationName == "jms.consume"
|
||||
consumerSpan.serviceName == "jms"
|
||||
consumerSpan.resourceName == "Consumed from $resourceName"
|
||||
consumerSpan.type == DDSpanTypes.MESSAGE_CONSUMER
|
||||
!consumerSpan.context().getErrorFlag()
|
||||
consumerSpan.context().parentId == span2.context().spanId
|
||||
|
||||
|
||||
def consumerTags = consumerSpan.context().tags
|
||||
consumerTags["span.kind"] == "consumer"
|
||||
consumerTags["component"] == "jms1"
|
||||
|
||||
consumerTags["span.origin.type"] == ActiveMQMessageConsumer.name
|
||||
|
||||
consumerTags["thread.name"] != null
|
||||
consumerTags["thread.id"] != null
|
||||
consumerTags.size() == 6
|
||||
tags {
|
||||
defaultTags()
|
||||
"${DDTags.SPAN_TYPE}" DDSpanTypes.MESSAGE_CONSUMER
|
||||
"${Tags.COMPONENT.key}" "jms1"
|
||||
"${Tags.SPAN_KIND.key}" "consumer"
|
||||
"span.origin.type" ActiveMQMessageConsumer.name
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cleanup:
|
||||
producer.close()
|
||||
consumer.close()
|
||||
|
||||
where:
|
||||
destination | resourceName
|
||||
destination | jmsResourceName
|
||||
session.createQueue("someQueue") | "Queue someQueue"
|
||||
session.createTopic("someTopic") | "Topic someTopic"
|
||||
session.createTemporaryQueue() | "Temporary Queue"
|
||||
session.createTemporaryTopic() | "Temporary Topic"
|
||||
}
|
||||
|
||||
def "sending to a MessageListener on #resourceName generates a span"() {
|
||||
def "sending to a MessageListener on #jmsResourceName generates a span"() {
|
||||
setup:
|
||||
def lock = new CountDownLatch(1)
|
||||
def messageRef = new AtomicReference<TextMessage>()
|
||||
|
|
@ -159,115 +96,195 @@ class JMS1Test extends AgentTestRunner {
|
|||
}
|
||||
}
|
||||
|
||||
def message = session.createTextMessage("a message")
|
||||
producer.send(message)
|
||||
lock.countDown()
|
||||
TEST_WRITER.waitForTraces(2)
|
||||
|
||||
expect:
|
||||
messageRef.get().text == "a message"
|
||||
TEST_WRITER.size() == 2
|
||||
assertTraces(TEST_WRITER, 2) {
|
||||
producerTrace(it, 0, jmsResourceName)
|
||||
trace(1, 1) { // Consumer trace
|
||||
span(0) {
|
||||
childOf TEST_WRITER.firstTrace().get(2)
|
||||
serviceName "jms"
|
||||
operationName "jms.onMessage"
|
||||
resourceName "Received from $jmsResourceName"
|
||||
spanType DDSpanTypes.MESSAGE_PRODUCER
|
||||
errored false
|
||||
|
||||
and: // producer trace
|
||||
def trace = TEST_WRITER.firstTrace()
|
||||
trace.size() == 3
|
||||
|
||||
and: // span 0
|
||||
def span0 = trace[0]
|
||||
|
||||
span0.context().operationName == "jms.produce"
|
||||
span0.serviceName == "jms"
|
||||
span0.resourceName == "Produced for $resourceName"
|
||||
span0.type == DDSpanTypes.MESSAGE_PRODUCER
|
||||
!span0.context().getErrorFlag()
|
||||
span0.context().parentId == 0
|
||||
|
||||
|
||||
def tags0 = span0.context().tags
|
||||
tags0["span.kind"] == "producer"
|
||||
tags0["component"] == "jms1"
|
||||
|
||||
tags0["span.origin.type"] == ActiveMQMessageProducer.name
|
||||
|
||||
tags0["thread.name"] != null
|
||||
tags0["thread.id"] != null
|
||||
tags0.size() == 6
|
||||
|
||||
and: // span 1
|
||||
def span1 = trace[1]
|
||||
|
||||
span1.context().operationName == "jms.produce"
|
||||
span1.serviceName == "jms"
|
||||
span1.resourceName == "Produced for $resourceName"
|
||||
span1.type == DDSpanTypes.MESSAGE_PRODUCER
|
||||
!span1.context().getErrorFlag()
|
||||
span1.context().parentId == span0.context().spanId
|
||||
|
||||
|
||||
def tags1 = span1.context().tags
|
||||
tags1["span.kind"] == "producer"
|
||||
tags1["component"] == "jms1"
|
||||
|
||||
tags1["span.origin.type"] == ActiveMQMessageProducer.name
|
||||
|
||||
tags1["thread.name"] != null
|
||||
tags1["thread.id"] != null
|
||||
tags1.size() == 6
|
||||
|
||||
and: // span 2
|
||||
def span2 = trace[2]
|
||||
|
||||
span2.context().operationName == "jms.produce"
|
||||
span2.serviceName == "jms"
|
||||
span2.resourceName == "Produced for $resourceName"
|
||||
span2.type == DDSpanTypes.MESSAGE_PRODUCER
|
||||
!span2.context().getErrorFlag()
|
||||
span2.context().parentId == span1.context().spanId
|
||||
|
||||
|
||||
def tags2 = span2.context().tags
|
||||
tags2["span.kind"] == "producer"
|
||||
tags2["component"] == "jms1"
|
||||
|
||||
tags2["span.origin.type"] == ActiveMQMessageProducer.name
|
||||
|
||||
tags2["thread.name"] != null
|
||||
tags2["thread.id"] != null
|
||||
tags2.size() == 6
|
||||
|
||||
and: // consumer trace
|
||||
def consumerTrace = TEST_WRITER.get(1)
|
||||
consumerTrace.size() == 1
|
||||
|
||||
def consumerSpan = consumerTrace[0]
|
||||
|
||||
consumerSpan.context().operationName == "jms.onMessage"
|
||||
consumerSpan.serviceName == "jms"
|
||||
consumerSpan.resourceName == "Received from $resourceName"
|
||||
consumerSpan.type == DDSpanTypes.MESSAGE_CONSUMER
|
||||
!consumerSpan.context().getErrorFlag()
|
||||
consumerSpan.context().parentId == span2.context().spanId
|
||||
|
||||
|
||||
def consumerTags = consumerSpan.context().tags
|
||||
consumerTags["span.kind"] == "consumer"
|
||||
consumerTags["component"] == "jms1"
|
||||
|
||||
consumerTags["span.origin.type"] != null
|
||||
|
||||
consumerTags["thread.name"] != null
|
||||
consumerTags["thread.id"] != null
|
||||
consumerTags.size() == 6
|
||||
tags {
|
||||
defaultTags()
|
||||
"${DDTags.SPAN_TYPE}" DDSpanTypes.MESSAGE_CONSUMER
|
||||
"${Tags.COMPONENT.key}" "jms1"
|
||||
"${Tags.SPAN_KIND.key}" "consumer"
|
||||
"span.origin.type" { t -> t.contains("JMS1Test") }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// This check needs to go after all traces have been accounted for
|
||||
messageRef.get().text == messageText
|
||||
|
||||
cleanup:
|
||||
producer.close()
|
||||
consumer.close()
|
||||
|
||||
where:
|
||||
destination | resourceName
|
||||
destination | jmsResourceName
|
||||
session.createQueue("someQueue") | "Queue someQueue"
|
||||
session.createTopic("someTopic") | "Topic someTopic"
|
||||
session.createTemporaryQueue() | "Temporary Queue"
|
||||
session.createTemporaryTopic() | "Temporary Topic"
|
||||
}
|
||||
|
||||
def "failing to receive message with receiveNoWait on #jmsResourceName works"() {
|
||||
setup:
|
||||
def consumer = session.createConsumer(destination)
|
||||
|
||||
// Receive with timeout
|
||||
TextMessage receivedMessage = consumer.receiveNoWait()
|
||||
|
||||
expect:
|
||||
receivedMessage == null
|
||||
assertTraces(TEST_WRITER, 1) {
|
||||
trace(0, 1) { // Consumer trace
|
||||
span(0) {
|
||||
parent()
|
||||
serviceName "jms"
|
||||
operationName "jms.consume"
|
||||
resourceName "JMS receiveNoWait"
|
||||
spanType DDSpanTypes.MESSAGE_PRODUCER
|
||||
errored false
|
||||
|
||||
tags {
|
||||
defaultTags()
|
||||
"${DDTags.SPAN_TYPE}" DDSpanTypes.MESSAGE_CONSUMER
|
||||
"${Tags.COMPONENT.key}" "jms1"
|
||||
"${Tags.SPAN_KIND.key}" "consumer"
|
||||
"span.origin.type" ActiveMQMessageConsumer.name
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cleanup:
|
||||
consumer.close()
|
||||
|
||||
where:
|
||||
destination | jmsResourceName
|
||||
session.createQueue("someQueue") | "Queue someQueue"
|
||||
session.createTopic("someTopic") | "Topic someTopic"
|
||||
}
|
||||
|
||||
def "failing to receive message with wait(timeout) on #jmsResourceName works"() {
|
||||
setup:
|
||||
def consumer = session.createConsumer(destination)
|
||||
|
||||
// Receive with timeout
|
||||
TextMessage receivedMessage = consumer.receive(100)
|
||||
|
||||
expect:
|
||||
receivedMessage == null
|
||||
assertTraces(TEST_WRITER, 1) {
|
||||
trace(0, 1) { // Consumer trace
|
||||
span(0) {
|
||||
parent()
|
||||
serviceName "jms"
|
||||
operationName "jms.consume"
|
||||
resourceName "JMS receive"
|
||||
spanType DDSpanTypes.MESSAGE_PRODUCER
|
||||
errored false
|
||||
|
||||
tags {
|
||||
defaultTags()
|
||||
"${DDTags.SPAN_TYPE}" DDSpanTypes.MESSAGE_CONSUMER
|
||||
"${Tags.COMPONENT.key}" "jms1"
|
||||
"${Tags.SPAN_KIND.key}" "consumer"
|
||||
"span.origin.type" ActiveMQMessageConsumer.name
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cleanup:
|
||||
consumer.close()
|
||||
|
||||
where:
|
||||
destination | jmsResourceName
|
||||
session.createQueue("someQueue") | "Queue someQueue"
|
||||
session.createTopic("someTopic") | "Topic someTopic"
|
||||
}
|
||||
|
||||
def producerTrace(ListWriterAssert writer, int index, String jmsResourceName) {
|
||||
writer.trace(index, 3) {
|
||||
span(0) {
|
||||
parent()
|
||||
serviceName "jms"
|
||||
operationName "jms.produce"
|
||||
resourceName "Produced for $jmsResourceName"
|
||||
spanType DDSpanTypes.MESSAGE_PRODUCER
|
||||
errored false
|
||||
|
||||
tags {
|
||||
defaultTags()
|
||||
"${DDTags.SPAN_TYPE}" DDSpanTypes.MESSAGE_PRODUCER
|
||||
"${Tags.COMPONENT.key}" "jms1"
|
||||
"${Tags.SPAN_KIND.key}" "producer"
|
||||
"span.origin.type" ActiveMQMessageProducer.name
|
||||
}
|
||||
}
|
||||
span(1) {
|
||||
childOf span(0)
|
||||
serviceName "jms"
|
||||
operationName "jms.produce"
|
||||
resourceName "Produced for $jmsResourceName"
|
||||
spanType DDSpanTypes.MESSAGE_PRODUCER
|
||||
errored false
|
||||
|
||||
tags {
|
||||
defaultTags()
|
||||
"${DDTags.SPAN_TYPE}" DDSpanTypes.MESSAGE_PRODUCER
|
||||
"${Tags.COMPONENT.key}" "jms1"
|
||||
"${Tags.SPAN_KIND.key}" "producer"
|
||||
"span.origin.type" ActiveMQMessageProducer.name
|
||||
}
|
||||
}
|
||||
span(2) {
|
||||
childOf span(1)
|
||||
serviceName "jms"
|
||||
operationName "jms.produce"
|
||||
resourceName "Produced for $jmsResourceName"
|
||||
spanType DDSpanTypes.MESSAGE_PRODUCER
|
||||
errored false
|
||||
|
||||
tags {
|
||||
defaultTags()
|
||||
"${DDTags.SPAN_TYPE}" DDSpanTypes.MESSAGE_PRODUCER
|
||||
"${Tags.COMPONENT.key}" "jms1"
|
||||
"${Tags.SPAN_KIND.key}" "producer"
|
||||
"span.origin.type" ActiveMQMessageProducer.name
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def consumerTrace(ListWriterAssert writer, int index, String jmsResourceName, origin) {
|
||||
writer.trace(index, 1) {
|
||||
span(0) {
|
||||
childOf TEST_WRITER.firstTrace().get(2)
|
||||
serviceName "jms"
|
||||
operationName "jms.onMessage"
|
||||
resourceName "Received from $jmsResourceName"
|
||||
spanType DDSpanTypes.MESSAGE_PRODUCER
|
||||
errored false
|
||||
|
||||
tags {
|
||||
defaultTags()
|
||||
"${DDTags.SPAN_TYPE}" DDSpanTypes.MESSAGE_CONSUMER
|
||||
"${Tags.COMPONENT.key}" "jms1"
|
||||
"${Tags.SPAN_KIND.key}" "consumer"
|
||||
"span.origin.type" origin
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,9 +22,11 @@ import datadog.trace.instrumentation.jms.util.MessagePropertyTextMap;
|
|||
import io.opentracing.Scope;
|
||||
import io.opentracing.Span;
|
||||
import io.opentracing.SpanContext;
|
||||
import io.opentracing.Tracer;
|
||||
import io.opentracing.propagation.Format;
|
||||
import io.opentracing.tag.Tags;
|
||||
import io.opentracing.util.GlobalTracer;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import javax.jms.Message;
|
||||
|
|
@ -54,7 +56,7 @@ public final class JMS2MessageConsumerInstrumentation extends Instrumenter.Confi
|
|||
.transform(
|
||||
DDAdvice.create()
|
||||
.advice(
|
||||
named("receive").and(takesArguments(0)).and(isPublic()),
|
||||
named("receive").and(takesArguments(0).or(takesArguments(1))).and(isPublic()),
|
||||
ConsumerAdvice.class.getName())
|
||||
.advice(
|
||||
named("receiveNoWait").and(takesArguments(0)).and(isPublic()),
|
||||
|
|
@ -73,31 +75,42 @@ public final class JMS2MessageConsumerInstrumentation extends Instrumenter.Confi
|
|||
public static void stopSpan(
|
||||
@Advice.This final MessageConsumer consumer,
|
||||
@Advice.Enter final long startTime,
|
||||
@Advice.Origin final Method method,
|
||||
@Advice.Return final Message message,
|
||||
@Advice.Thrown final Throwable throwable) {
|
||||
|
||||
final SpanContext extractedContext =
|
||||
GlobalTracer.get().extract(Format.Builtin.TEXT_MAP, new MessagePropertyTextMap(message));
|
||||
|
||||
final Scope scope =
|
||||
Tracer.SpanBuilder spanBuilder =
|
||||
GlobalTracer.get()
|
||||
.buildSpan("jms.consume")
|
||||
.asChildOf(extractedContext)
|
||||
.withTag(DDTags.SERVICE_NAME, "jms")
|
||||
.withTag(DDTags.SPAN_TYPE, DDSpanTypes.MESSAGE_CONSUMER)
|
||||
.withTag(Tags.COMPONENT.getKey(), "jms2")
|
||||
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER)
|
||||
.withTag("span.origin.type", consumer.getClass().getName())
|
||||
.withStartTimestamp(TimeUnit.MILLISECONDS.toMicros(startTime))
|
||||
.startActive(true);
|
||||
.withStartTimestamp(TimeUnit.MILLISECONDS.toMicros(startTime));
|
||||
|
||||
if (message == null) {
|
||||
spanBuilder = spanBuilder.withTag(DDTags.RESOURCE_NAME, "JMS " + method.getName());
|
||||
} else {
|
||||
spanBuilder =
|
||||
spanBuilder.withTag(
|
||||
DDTags.RESOURCE_NAME, "Consumed from " + toResourceName(message, null));
|
||||
|
||||
final SpanContext extractedContext =
|
||||
GlobalTracer.get()
|
||||
.extract(Format.Builtin.TEXT_MAP, new MessagePropertyTextMap(message));
|
||||
if (extractedContext != null) {
|
||||
spanBuilder = spanBuilder.asChildOf(extractedContext);
|
||||
}
|
||||
}
|
||||
|
||||
final Scope scope = spanBuilder.startActive(true);
|
||||
final Span span = scope.span();
|
||||
|
||||
if (throwable != null) {
|
||||
Tags.ERROR.set(span, Boolean.TRUE);
|
||||
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
|
||||
}
|
||||
span.setTag(DDTags.RESOURCE_NAME, "Consumed from " + toResourceName(message, null));
|
||||
|
||||
scope.close();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,9 @@
|
|||
import com.google.common.io.Files
|
||||
import datadog.trace.agent.test.AgentTestRunner
|
||||
import datadog.trace.agent.test.ListWriterAssert
|
||||
import datadog.trace.api.DDSpanTypes
|
||||
import datadog.trace.api.DDTags
|
||||
import io.opentracing.tag.Tags
|
||||
import org.hornetq.api.core.TransportConfiguration
|
||||
import org.hornetq.api.core.client.HornetQClient
|
||||
import org.hornetq.api.jms.HornetQJMSClient
|
||||
|
|
@ -23,7 +26,11 @@ import javax.jms.TextMessage
|
|||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
|
||||
import static datadog.trace.agent.test.ListWriterAssert.assertTraces
|
||||
|
||||
class JMS2Test extends AgentTestRunner {
|
||||
@Shared
|
||||
String messageText = "a message"
|
||||
@Shared
|
||||
static Session session
|
||||
|
||||
|
|
@ -62,81 +69,53 @@ class JMS2Test extends AgentTestRunner {
|
|||
session.run()
|
||||
}
|
||||
|
||||
def "sending a message to #resourceName generates spans"() {
|
||||
def "sending a message to #jmsResourceName generates spans"() {
|
||||
setup:
|
||||
def producer = session.createProducer(destination)
|
||||
def consumer = session.createConsumer(destination)
|
||||
def message = session.createTextMessage("a message")
|
||||
def message = session.createTextMessage(messageText)
|
||||
|
||||
producer.send(message)
|
||||
|
||||
TextMessage receivedMessage = consumer.receive()
|
||||
|
||||
expect:
|
||||
receivedMessage.text == "a message"
|
||||
TEST_WRITER.size() == 2
|
||||
receivedMessage.text == messageText
|
||||
assertTraces(TEST_WRITER, 2) {
|
||||
producerTrace(it, 0, jmsResourceName)
|
||||
trace(1, 1) { // Consumer trace
|
||||
span(0) {
|
||||
childOf TEST_WRITER.firstTrace().get(0)
|
||||
serviceName "jms"
|
||||
operationName "jms.consume"
|
||||
resourceName "Consumed from $jmsResourceName"
|
||||
spanType DDSpanTypes.MESSAGE_PRODUCER
|
||||
errored false
|
||||
|
||||
and: // producer trace
|
||||
def trace = TEST_WRITER.firstTrace()
|
||||
trace.size() == 1
|
||||
|
||||
def producerSpan = trace[0]
|
||||
|
||||
producerSpan.context().operationName == "jms.produce"
|
||||
producerSpan.serviceName == "jms"
|
||||
producerSpan.resourceName == "Produced for $resourceName"
|
||||
producerSpan.type == DDSpanTypes.MESSAGE_PRODUCER
|
||||
!producerSpan.context().getErrorFlag()
|
||||
producerSpan.context().parentId == 0
|
||||
|
||||
|
||||
def producerTags = producerSpan.context().tags
|
||||
producerTags["span.kind"] == "producer"
|
||||
producerTags["component"] == "jms2"
|
||||
|
||||
producerTags["span.origin.type"] == HornetQMessageProducer.name
|
||||
|
||||
producerTags["thread.name"] != null
|
||||
producerTags["thread.id"] != null
|
||||
producerTags.size() == 6
|
||||
|
||||
and: // consumer trace
|
||||
def consumerTrace = TEST_WRITER.get(1)
|
||||
consumerTrace.size() == 1
|
||||
|
||||
def consumerSpan = consumerTrace[0]
|
||||
|
||||
consumerSpan.context().operationName == "jms.consume"
|
||||
consumerSpan.serviceName == "jms"
|
||||
consumerSpan.resourceName == "Consumed from $resourceName"
|
||||
consumerSpan.type == DDSpanTypes.MESSAGE_CONSUMER
|
||||
!consumerSpan.context().getErrorFlag()
|
||||
consumerSpan.context().parentId == producerSpan.context().spanId
|
||||
|
||||
|
||||
def consumerTags = consumerSpan.context().tags
|
||||
consumerTags["span.kind"] == "consumer"
|
||||
consumerTags["component"] == "jms2"
|
||||
|
||||
consumerTags["span.origin.type"] == HornetQMessageConsumer.name
|
||||
|
||||
consumerTags["thread.name"] != null
|
||||
consumerTags["thread.id"] != null
|
||||
consumerTags.size() == 6
|
||||
tags {
|
||||
defaultTags()
|
||||
"${DDTags.SPAN_TYPE}" DDSpanTypes.MESSAGE_CONSUMER
|
||||
"${Tags.COMPONENT.key}" "jms2"
|
||||
"${Tags.SPAN_KIND.key}" "consumer"
|
||||
"span.origin.type" HornetQMessageConsumer.name
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cleanup:
|
||||
producer.close()
|
||||
consumer.close()
|
||||
|
||||
where:
|
||||
destination | resourceName
|
||||
destination | jmsResourceName
|
||||
session.createQueue("someQueue") | "Queue someQueue"
|
||||
session.createTopic("someTopic") | "Topic someTopic"
|
||||
session.createTemporaryQueue() | "Temporary Queue"
|
||||
session.createTemporaryTopic() | "Temporary Topic"
|
||||
}
|
||||
|
||||
def "sending to a MessageListener on #resourceName generates a span"() {
|
||||
def "sending to a MessageListener on #jmsResourceName generates a span"() {
|
||||
setup:
|
||||
def lock = new CountDownLatch(1)
|
||||
def messageRef = new AtomicReference<TextMessage>()
|
||||
|
|
@ -150,72 +129,164 @@ class JMS2Test extends AgentTestRunner {
|
|||
}
|
||||
}
|
||||
|
||||
def message = session.createTextMessage("a message")
|
||||
def message = session.createTextMessage(messageText)
|
||||
producer.send(message)
|
||||
lock.countDown()
|
||||
TEST_WRITER.waitForTraces(2)
|
||||
|
||||
expect:
|
||||
messageRef.get().text == "a message"
|
||||
TEST_WRITER.size() == 2
|
||||
assertTraces(TEST_WRITER, 2) {
|
||||
producerTrace(it, 0, jmsResourceName)
|
||||
trace(1, 1) { // Consumer trace
|
||||
span(0) {
|
||||
childOf TEST_WRITER.firstTrace().get(0)
|
||||
serviceName "jms"
|
||||
operationName "jms.onMessage"
|
||||
resourceName "Received from $jmsResourceName"
|
||||
spanType DDSpanTypes.MESSAGE_PRODUCER
|
||||
errored false
|
||||
|
||||
and: // producer trace
|
||||
def trace = TEST_WRITER.firstTrace()
|
||||
trace.size() == 1
|
||||
|
||||
def producerSpan = trace[0]
|
||||
|
||||
producerSpan.context().operationName == "jms.produce"
|
||||
producerSpan.serviceName == "jms"
|
||||
producerSpan.resourceName == "Produced for $resourceName"
|
||||
producerSpan.type == DDSpanTypes.MESSAGE_PRODUCER
|
||||
!producerSpan.context().getErrorFlag()
|
||||
producerSpan.context().parentId == 0
|
||||
|
||||
|
||||
def producerTags = producerSpan.context().tags
|
||||
producerTags["span.kind"] == "producer"
|
||||
producerTags["component"] == "jms2"
|
||||
|
||||
producerTags["span.origin.type"] == HornetQMessageProducer.name
|
||||
|
||||
producerTags["thread.name"] != null
|
||||
producerTags["thread.id"] != null
|
||||
producerTags.size() == 6
|
||||
|
||||
and: // consumer trace
|
||||
def consumerTrace = TEST_WRITER.get(1)
|
||||
consumerTrace.size() == 1
|
||||
|
||||
def consumerSpan = consumerTrace[0]
|
||||
|
||||
consumerSpan.context().operationName == "jms.onMessage"
|
||||
consumerSpan.serviceName == "jms"
|
||||
consumerSpan.resourceName == "Received from $resourceName"
|
||||
consumerSpan.type == DDSpanTypes.MESSAGE_CONSUMER
|
||||
!consumerSpan.context().getErrorFlag()
|
||||
consumerSpan.context().parentId == producerSpan.context().spanId
|
||||
|
||||
|
||||
def consumerTags = consumerSpan.context().tags
|
||||
consumerTags["span.kind"] == "consumer"
|
||||
consumerTags["component"] == "jms2"
|
||||
|
||||
consumerTags["span.origin.type"] != null
|
||||
|
||||
consumerTags["thread.name"] != null
|
||||
consumerTags["thread.id"] != null
|
||||
consumerTags.size() == 6
|
||||
tags {
|
||||
defaultTags()
|
||||
"${DDTags.SPAN_TYPE}" DDSpanTypes.MESSAGE_CONSUMER
|
||||
"${Tags.COMPONENT.key}" "jms2"
|
||||
"${Tags.SPAN_KIND.key}" "consumer"
|
||||
"span.origin.type" { t -> t.contains("JMS2Test") }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// This check needs to go after all traces have been accounted for
|
||||
messageRef.get().text == messageText
|
||||
|
||||
cleanup:
|
||||
producer.close()
|
||||
consumer.close()
|
||||
|
||||
where:
|
||||
destination | resourceName
|
||||
destination | jmsResourceName
|
||||
session.createQueue("someQueue") | "Queue someQueue"
|
||||
session.createTopic("someTopic") | "Topic someTopic"
|
||||
session.createTemporaryQueue() | "Temporary Queue"
|
||||
session.createTemporaryTopic() | "Temporary Topic"
|
||||
}
|
||||
|
||||
def "failing to receive message with receiveNoWait on #jmsResourceName works"() {
|
||||
setup:
|
||||
def consumer = session.createConsumer(destination)
|
||||
|
||||
// Receive with timeout
|
||||
TextMessage receivedMessage = consumer.receiveNoWait()
|
||||
|
||||
expect:
|
||||
receivedMessage == null
|
||||
assertTraces(TEST_WRITER, 1) {
|
||||
trace(0, 1) { // Consumer trace
|
||||
span(0) {
|
||||
parent()
|
||||
serviceName "jms"
|
||||
operationName "jms.consume"
|
||||
resourceName "JMS receiveNoWait"
|
||||
spanType DDSpanTypes.MESSAGE_PRODUCER
|
||||
errored false
|
||||
|
||||
tags {
|
||||
defaultTags()
|
||||
"${DDTags.SPAN_TYPE}" DDSpanTypes.MESSAGE_CONSUMER
|
||||
"${Tags.COMPONENT.key}" "jms2"
|
||||
"${Tags.SPAN_KIND.key}" "consumer"
|
||||
"span.origin.type" HornetQMessageConsumer.name
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cleanup:
|
||||
consumer.close()
|
||||
|
||||
where:
|
||||
destination | jmsResourceName
|
||||
session.createQueue("someQueue") | "Queue someQueue"
|
||||
session.createTopic("someTopic") | "Topic someTopic"
|
||||
}
|
||||
|
||||
def "failing to receive message with wait(timeout) on #jmsResourceName works"() {
|
||||
setup:
|
||||
def consumer = session.createConsumer(destination)
|
||||
|
||||
// Receive with timeout
|
||||
TextMessage receivedMessage = consumer.receive(100)
|
||||
|
||||
expect:
|
||||
receivedMessage == null
|
||||
assertTraces(TEST_WRITER, 1) {
|
||||
trace(0, 1) { // Consumer trace
|
||||
span(0) {
|
||||
parent()
|
||||
serviceName "jms"
|
||||
operationName "jms.consume"
|
||||
resourceName "JMS receive"
|
||||
spanType DDSpanTypes.MESSAGE_PRODUCER
|
||||
errored false
|
||||
|
||||
tags {
|
||||
defaultTags()
|
||||
"${DDTags.SPAN_TYPE}" DDSpanTypes.MESSAGE_CONSUMER
|
||||
"${Tags.COMPONENT.key}" "jms2"
|
||||
"${Tags.SPAN_KIND.key}" "consumer"
|
||||
"span.origin.type" HornetQMessageConsumer.name
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cleanup:
|
||||
consumer.close()
|
||||
|
||||
where:
|
||||
destination | jmsResourceName
|
||||
session.createQueue("someQueue") | "Queue someQueue"
|
||||
session.createTopic("someTopic") | "Topic someTopic"
|
||||
}
|
||||
|
||||
def producerTrace(ListWriterAssert writer, int index, String jmsResourceName) {
|
||||
writer.trace(index, 1) {
|
||||
span(0) {
|
||||
parent()
|
||||
serviceName "jms"
|
||||
operationName "jms.produce"
|
||||
resourceName "Produced for $jmsResourceName"
|
||||
spanType DDSpanTypes.MESSAGE_PRODUCER
|
||||
errored false
|
||||
|
||||
tags {
|
||||
defaultTags()
|
||||
"${DDTags.SPAN_TYPE}" DDSpanTypes.MESSAGE_PRODUCER
|
||||
"${Tags.COMPONENT.key}" "jms2"
|
||||
"${Tags.SPAN_KIND.key}" "producer"
|
||||
"span.origin.type" HornetQMessageProducer.name
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def consumerTrace(ListWriterAssert writer, int index, String jmsResourceName, origin) {
|
||||
writer.trace(index, 1) {
|
||||
span(0) {
|
||||
childOf TEST_WRITER.firstTrace().get(2)
|
||||
serviceName "jms"
|
||||
operationName "jms.onMessage"
|
||||
resourceName "Received from $jmsResourceName"
|
||||
spanType DDSpanTypes.MESSAGE_PRODUCER
|
||||
errored false
|
||||
|
||||
tags {
|
||||
defaultTags()
|
||||
"${DDTags.SPAN_TYPE}" DDSpanTypes.MESSAGE_CONSUMER
|
||||
"${Tags.COMPONENT.key}" "jms2"
|
||||
"${Tags.SPAN_KIND.key}" "consumer"
|
||||
"span.origin.type" origin
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue