JMS2: Fix `receiveNoWait` instrumentation and instrument `receive(timeout)`

This commit is contained in:
Nikolay Martynov 2018-06-20 14:24:05 -04:00
parent 3be5a78a53
commit 37fdbf9188
2 changed files with 201 additions and 115 deletions

View File

@ -22,9 +22,11 @@ import datadog.trace.instrumentation.jms.util.MessagePropertyTextMap;
import io.opentracing.Scope; import io.opentracing.Scope;
import io.opentracing.Span; import io.opentracing.Span;
import io.opentracing.SpanContext; import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.propagation.Format; import io.opentracing.propagation.Format;
import io.opentracing.tag.Tags; import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer; import io.opentracing.util.GlobalTracer;
import java.lang.reflect.Method;
import java.util.Collections; import java.util.Collections;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import javax.jms.Message; import javax.jms.Message;
@ -54,7 +56,7 @@ public final class JMS2MessageConsumerInstrumentation extends Instrumenter.Confi
.transform( .transform(
DDAdvice.create() DDAdvice.create()
.advice( .advice(
named("receive").and(takesArguments(0)).and(isPublic()), named("receive").and(takesArguments(0).or(takesArguments(1))).and(isPublic()),
ConsumerAdvice.class.getName()) ConsumerAdvice.class.getName())
.advice( .advice(
named("receiveNoWait").and(takesArguments(0)).and(isPublic()), named("receiveNoWait").and(takesArguments(0)).and(isPublic()),
@ -73,31 +75,44 @@ public final class JMS2MessageConsumerInstrumentation extends Instrumenter.Confi
public static void stopSpan( public static void stopSpan(
@Advice.This final MessageConsumer consumer, @Advice.This final MessageConsumer consumer,
@Advice.Enter final long startTime, @Advice.Enter final long startTime,
@Advice.Origin final Method method,
@Advice.Return final Message message, @Advice.Return final Message message,
@Advice.Thrown final Throwable throwable) { @Advice.Thrown final Throwable throwable) {
Tracer.SpanBuilder spanBuilder =
final SpanContext extractedContext =
GlobalTracer.get().extract(Format.Builtin.TEXT_MAP, new MessagePropertyTextMap(message));
final Scope scope =
GlobalTracer.get() GlobalTracer.get()
.buildSpan("jms.consume") .buildSpan("jms.consume")
.asChildOf(extractedContext)
.withTag(DDTags.SERVICE_NAME, "jms") .withTag(DDTags.SERVICE_NAME, "jms")
.withTag(DDTags.SPAN_TYPE, DDSpanTypes.MESSAGE_CONSUMER) .withTag(DDTags.SPAN_TYPE, DDSpanTypes.MESSAGE_CONSUMER)
.withTag(Tags.COMPONENT.getKey(), "jms2") .withTag(Tags.COMPONENT.getKey(), "jms2")
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER) .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER)
.withTag("span.origin.type", consumer.getClass().getName()) .withTag("span.origin.type", consumer.getClass().getName())
.withStartTimestamp(TimeUnit.MILLISECONDS.toMicros(startTime)) .withStartTimestamp(TimeUnit.MILLISECONDS.toMicros(startTime));
.startActive(true);
String resourceNamePrefix = "JMS " + method.getName() + ": ";
if (message == null) {
spanBuilder = spanBuilder.withTag(DDTags.RESOURCE_NAME, resourceNamePrefix + "no message");
} else {
spanBuilder =
spanBuilder.withTag(
DDTags.RESOURCE_NAME,
resourceNamePrefix + "consumed from " + toResourceName(message, null));
final SpanContext extractedContext =
GlobalTracer.get()
.extract(Format.Builtin.TEXT_MAP, new MessagePropertyTextMap(message));
if (extractedContext != null) {
spanBuilder = spanBuilder.asChildOf(extractedContext);
}
}
final Scope scope = spanBuilder.startActive(true);
final Span span = scope.span(); final Span span = scope.span();
if (throwable != null) { if (throwable != null) {
Tags.ERROR.set(span, Boolean.TRUE); Tags.ERROR.set(span, Boolean.TRUE);
span.log(Collections.singletonMap(ERROR_OBJECT, throwable)); span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
} }
span.setTag(DDTags.RESOURCE_NAME, "Consumed from " + toResourceName(message, null));
scope.close(); scope.close();
} }
} }

View File

@ -1,6 +1,9 @@
import com.google.common.io.Files import com.google.common.io.Files
import datadog.trace.agent.test.AgentTestRunner import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.agent.test.ListWriterAssert
import datadog.trace.api.DDSpanTypes import datadog.trace.api.DDSpanTypes
import datadog.trace.api.DDTags
import io.opentracing.tag.Tags
import org.hornetq.api.core.TransportConfiguration import org.hornetq.api.core.TransportConfiguration
import org.hornetq.api.core.client.HornetQClient import org.hornetq.api.core.client.HornetQClient
import org.hornetq.api.jms.HornetQJMSClient import org.hornetq.api.jms.HornetQJMSClient
@ -23,7 +26,11 @@ import javax.jms.TextMessage
import java.util.concurrent.CountDownLatch import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
import static datadog.trace.agent.test.ListWriterAssert.assertTraces
class JMS2Test extends AgentTestRunner { class JMS2Test extends AgentTestRunner {
@Shared
String messageText = "a message"
@Shared @Shared
static Session session static Session session
@ -62,81 +69,53 @@ class JMS2Test extends AgentTestRunner {
session.run() session.run()
} }
def "sending a message to #resourceName generates spans"() { def "sending a message to #jmsResourceName generates spans"() {
setup: setup:
def producer = session.createProducer(destination) def producer = session.createProducer(destination)
def consumer = session.createConsumer(destination) def consumer = session.createConsumer(destination)
def message = session.createTextMessage("a message") def message = session.createTextMessage(messageText)
producer.send(message) producer.send(message)
TextMessage receivedMessage = consumer.receive() TextMessage receivedMessage = consumer.receive()
expect: expect:
receivedMessage.text == "a message" receivedMessage.text == messageText
TEST_WRITER.size() == 2 assertTraces(TEST_WRITER, 2) {
producerTrace(it, 0, jmsResourceName)
trace(1, 1) { // Consumer trace
span(0) {
childOf TEST_WRITER.firstTrace().get(0)
serviceName "jms"
operationName "jms.consume"
resourceName "JMS receive: consumed from $jmsResourceName"
spanType DDSpanTypes.MESSAGE_PRODUCER
errored false
and: // producer trace tags {
def trace = TEST_WRITER.firstTrace() defaultTags()
trace.size() == 1 "${DDTags.SPAN_TYPE}" DDSpanTypes.MESSAGE_CONSUMER
"${Tags.COMPONENT.key}" "jms2"
def producerSpan = trace[0] "${Tags.SPAN_KIND.key}" "consumer"
"span.origin.type" HornetQMessageConsumer.name
producerSpan.context().operationName == "jms.produce" }
producerSpan.serviceName == "jms" }
producerSpan.resourceName == "Produced for $resourceName" }
producerSpan.type == DDSpanTypes.MESSAGE_PRODUCER }
!producerSpan.context().getErrorFlag()
producerSpan.context().parentId == 0
def producerTags = producerSpan.context().tags
producerTags["span.kind"] == "producer"
producerTags["component"] == "jms2"
producerTags["span.origin.type"] == HornetQMessageProducer.name
producerTags["thread.name"] != null
producerTags["thread.id"] != null
producerTags.size() == 6
and: // consumer trace
def consumerTrace = TEST_WRITER.get(1)
consumerTrace.size() == 1
def consumerSpan = consumerTrace[0]
consumerSpan.context().operationName == "jms.consume"
consumerSpan.serviceName == "jms"
consumerSpan.resourceName == "Consumed from $resourceName"
consumerSpan.type == DDSpanTypes.MESSAGE_CONSUMER
!consumerSpan.context().getErrorFlag()
consumerSpan.context().parentId == producerSpan.context().spanId
def consumerTags = consumerSpan.context().tags
consumerTags["span.kind"] == "consumer"
consumerTags["component"] == "jms2"
consumerTags["span.origin.type"] == HornetQMessageConsumer.name
consumerTags["thread.name"] != null
consumerTags["thread.id"] != null
consumerTags.size() == 6
cleanup: cleanup:
producer.close() producer.close()
consumer.close() consumer.close()
where: where:
destination | resourceName destination | jmsResourceName
session.createQueue("someQueue") | "Queue someQueue" session.createQueue("someQueue") | "Queue someQueue"
session.createTopic("someTopic") | "Topic someTopic" session.createTopic("someTopic") | "Topic someTopic"
session.createTemporaryQueue() | "Temporary Queue" session.createTemporaryQueue() | "Temporary Queue"
session.createTemporaryTopic() | "Temporary Topic" session.createTemporaryTopic() | "Temporary Topic"
} }
def "sending to a MessageListener on #resourceName generates a span"() { def "sending to a MessageListener on #jmsResourceName generates a span"() {
setup: setup:
def lock = new CountDownLatch(1) def lock = new CountDownLatch(1)
def messageRef = new AtomicReference<TextMessage>() def messageRef = new AtomicReference<TextMessage>()
@ -150,72 +129,164 @@ class JMS2Test extends AgentTestRunner {
} }
} }
def message = session.createTextMessage("a message") def message = session.createTextMessage(messageText)
producer.send(message) producer.send(message)
lock.countDown() lock.countDown()
TEST_WRITER.waitForTraces(2)
expect: expect:
messageRef.get().text == "a message" assertTraces(TEST_WRITER, 2) {
TEST_WRITER.size() == 2 producerTrace(it, 0, jmsResourceName)
trace(1, 1) { // Consumer trace
span(0) {
childOf TEST_WRITER.firstTrace().get(0)
serviceName "jms"
operationName "jms.onMessage"
resourceName "Received from $jmsResourceName"
spanType DDSpanTypes.MESSAGE_PRODUCER
errored false
and: // producer trace tags {
def trace = TEST_WRITER.firstTrace() defaultTags()
trace.size() == 1 "${DDTags.SPAN_TYPE}" DDSpanTypes.MESSAGE_CONSUMER
"${Tags.COMPONENT.key}" "jms2"
def producerSpan = trace[0] "${Tags.SPAN_KIND.key}" "consumer"
"span.origin.type" { t -> t.contains("JMS2Test") }
producerSpan.context().operationName == "jms.produce" }
producerSpan.serviceName == "jms" }
producerSpan.resourceName == "Produced for $resourceName" }
producerSpan.type == DDSpanTypes.MESSAGE_PRODUCER }
!producerSpan.context().getErrorFlag() // This check needs to go after all traces have been accounted for
producerSpan.context().parentId == 0 messageRef.get().text == messageText
def producerTags = producerSpan.context().tags
producerTags["span.kind"] == "producer"
producerTags["component"] == "jms2"
producerTags["span.origin.type"] == HornetQMessageProducer.name
producerTags["thread.name"] != null
producerTags["thread.id"] != null
producerTags.size() == 6
and: // consumer trace
def consumerTrace = TEST_WRITER.get(1)
consumerTrace.size() == 1
def consumerSpan = consumerTrace[0]
consumerSpan.context().operationName == "jms.onMessage"
consumerSpan.serviceName == "jms"
consumerSpan.resourceName == "Received from $resourceName"
consumerSpan.type == DDSpanTypes.MESSAGE_CONSUMER
!consumerSpan.context().getErrorFlag()
consumerSpan.context().parentId == producerSpan.context().spanId
def consumerTags = consumerSpan.context().tags
consumerTags["span.kind"] == "consumer"
consumerTags["component"] == "jms2"
consumerTags["span.origin.type"] != null
consumerTags["thread.name"] != null
consumerTags["thread.id"] != null
consumerTags.size() == 6
cleanup: cleanup:
producer.close() producer.close()
consumer.close() consumer.close()
where: where:
destination | resourceName destination | jmsResourceName
session.createQueue("someQueue") | "Queue someQueue" session.createQueue("someQueue") | "Queue someQueue"
session.createTopic("someTopic") | "Topic someTopic" session.createTopic("someTopic") | "Topic someTopic"
session.createTemporaryQueue() | "Temporary Queue" session.createTemporaryQueue() | "Temporary Queue"
session.createTemporaryTopic() | "Temporary Topic" session.createTemporaryTopic() | "Temporary Topic"
} }
def "failing to receive message with receiveNoWait on #jmsResourceName works"() {
setup:
def consumer = session.createConsumer(destination)
// Receive with timeout
TextMessage receivedMessage = consumer.receiveNoWait()
expect:
receivedMessage == null
assertTraces(TEST_WRITER, 1) {
trace(0, 1) { // Consumer trace
span(0) {
parent()
serviceName "jms"
operationName "jms.consume"
resourceName "JMS receiveNoWait: no message"
spanType DDSpanTypes.MESSAGE_PRODUCER
errored false
tags {
defaultTags()
"${DDTags.SPAN_TYPE}" DDSpanTypes.MESSAGE_CONSUMER
"${Tags.COMPONENT.key}" "jms2"
"${Tags.SPAN_KIND.key}" "consumer"
"span.origin.type" HornetQMessageConsumer.name
}
}
}
}
cleanup:
consumer.close()
where:
destination | jmsResourceName
session.createQueue("someQueue") | "Queue someQueue"
session.createTopic("someTopic") | "Topic someTopic"
}
def "failing to receive message with wait(timeout) on #jmsResourceName works"() {
setup:
def consumer = session.createConsumer(destination)
// Receive with timeout
TextMessage receivedMessage = consumer.receive(100)
expect:
receivedMessage == null
assertTraces(TEST_WRITER, 1) {
trace(0, 1) { // Consumer trace
span(0) {
parent()
serviceName "jms"
operationName "jms.consume"
resourceName "JMS receive: no message"
spanType DDSpanTypes.MESSAGE_PRODUCER
errored false
tags {
defaultTags()
"${DDTags.SPAN_TYPE}" DDSpanTypes.MESSAGE_CONSUMER
"${Tags.COMPONENT.key}" "jms2"
"${Tags.SPAN_KIND.key}" "consumer"
"span.origin.type" HornetQMessageConsumer.name
}
}
}
}
cleanup:
consumer.close()
where:
destination | jmsResourceName
session.createQueue("someQueue") | "Queue someQueue"
session.createTopic("someTopic") | "Topic someTopic"
}
def producerTrace(ListWriterAssert writer, int index, String jmsResourceName) {
writer.trace(index, 1) {
span(0) {
parent()
serviceName "jms"
operationName "jms.produce"
resourceName "Produced for $jmsResourceName"
spanType DDSpanTypes.MESSAGE_PRODUCER
errored false
tags {
defaultTags()
"${DDTags.SPAN_TYPE}" DDSpanTypes.MESSAGE_PRODUCER
"${Tags.COMPONENT.key}" "jms2"
"${Tags.SPAN_KIND.key}" "producer"
"span.origin.type" HornetQMessageProducer.name
}
}
}
}
def consumerTrace(ListWriterAssert writer, int index, String jmsResourceName, origin) {
writer.trace(index, 1) {
span(0) {
childOf TEST_WRITER.firstTrace().get(2)
serviceName "jms"
operationName "jms.onMessage"
resourceName "Received from $jmsResourceName"
spanType DDSpanTypes.MESSAGE_PRODUCER
errored false
tags {
defaultTags()
"${DDTags.SPAN_TYPE}" DDSpanTypes.MESSAGE_CONSUMER
"${Tags.COMPONENT.key}" "jms2"
"${Tags.SPAN_KIND.key}" "consumer"
"span.origin.type" origin
}
}
}
}
} }