Change link behavior for consumers (#198)

This commit is contained in:
Trask Stalnaker 2020-03-02 16:29:25 -08:00 committed by GitHub
parent 9b73e85bef
commit fa72243966
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 232 additions and 104 deletions

View File

@ -44,6 +44,8 @@ import javax.jms.TextMessage
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicReference
import static io.opentelemetry.auto.test.utils.TraceUtils.basicSpan
import static io.opentelemetry.auto.test.utils.TraceUtils.runUnderTrace
import static io.opentelemetry.trace.Span.Kind.CLIENT
import static io.opentelemetry.trace.Span.Kind.CONSUMER
import static io.opentelemetry.trace.Span.Kind.PRODUCER
@ -107,14 +109,47 @@ class JMS2Test extends AgentTestRunner {
TextMessage receivedMessage = consumer.receive()
expect:
receivedMessage.text == messageText
assertTraces(1) {
trace(0, 2) {
producerSpan(it, 0, jmsResourceName)
consumerSpan(it, 1, jmsResourceName, false, HornetQMessageConsumer, span(0))
}
}
cleanup:
producer.close()
consumer.close()
where:
destination | jmsResourceName
session.createQueue("someQueue") | "Queue someQueue"
session.createTopic("someTopic") | "Topic someTopic"
session.createTemporaryQueue() | "Temporary Queue"
session.createTemporaryTopic() | "Temporary Topic"
}
def "sending a message to #jmsResourceName and receive under existing parent generates link"() {
setup:
def producer = session.createProducer(destination)
def consumer = session.createConsumer(destination)
producer.send(message)
TextMessage receivedMessage = runUnderTrace("parent") {
consumer.receive()
}
expect:
receivedMessage.text == messageText
assertTraces(2) {
trace(0, 1) {
producerSpan(it, 0, jmsResourceName)
}
trace(1, 1) {
consumerSpan(it, 0, jmsResourceName, false, HornetQMessageConsumer, traces[0][0])
trace(1, 2) {
basicSpan(it, 0, "parent")
consumerSpan(it, 1, jmsResourceName, false, HornetQMessageConsumer, span(0), traces[0][0])
}
}
@ -257,17 +292,19 @@ class JMS2Test extends AgentTestRunner {
}
}
static consumerSpan(TraceAssert trace, int index, String jmsResourceName, boolean messageListener, Class origin, Object parentOrLinkSpan) {
static consumerSpan(TraceAssert trace, int index, String jmsResourceName, boolean messageListener, Class origin, Object parentSpan, Object linkSpan = null) {
trace.span(index) {
if (messageListener) {
operationName "jms.onMessage"
childOf((SpanData) parentOrLinkSpan)
spanKind CONSUMER
} else {
operationName "jms.consume"
hasLink((SpanData) parentOrLinkSpan)
spanKind CLIENT
}
childOf((SpanData) parentSpan)
if (linkSpan) {
hasLink((SpanData) linkSpan)
}
errored false
tags {

View File

@ -36,6 +36,8 @@ import java.util.concurrent.TimeUnit
import static JMS2Test.consumerSpan
import static JMS2Test.producerSpan
import static io.opentelemetry.auto.test.utils.TraceUtils.basicSpan
import static io.opentelemetry.auto.test.utils.TraceUtils.runUnderTrace
class SpringTemplateJMS2Test extends AgentTestRunner {
@Shared
@ -87,14 +89,36 @@ class SpringTemplateJMS2Test extends AgentTestRunner {
template.convertAndSend(destination, messageText)
TextMessage receivedMessage = template.receive(destination)
expect:
receivedMessage.text == messageText
assertTraces(1) {
trace(0, 2) {
producerSpan(it, 0, jmsResourceName)
consumerSpan(it, 1, jmsResourceName, false, HornetQMessageConsumer, span(0))
}
}
where:
destination | jmsResourceName
session.createQueue("someSpringQueue") | "Queue someSpringQueue"
}
def "sending a message to #jmsResourceName and receive under existing parent generates link"() {
setup:
template.convertAndSend(destination, messageText)
TextMessage receivedMessage = runUnderTrace("parent") {
template.receive(destination)
}
expect:
receivedMessage.text == messageText
assertTraces(2) {
trace(0, 1) {
producerSpan(it, 0, jmsResourceName)
}
trace(1, 1) {
consumerSpan(it, 0, jmsResourceName, false, HornetQMessageConsumer, traces[0][0])
trace(1, 2) {
basicSpan(it, 0, "parent")
consumerSpan(it, 1, jmsResourceName, false, HornetQMessageConsumer, span(0), traces[0][0])
}
}
@ -120,18 +144,14 @@ class SpringTemplateJMS2Test extends AgentTestRunner {
expect:
receivedMessage.text == "responded!"
assertTraces(4) {
trace(0, 1) {
assertTraces(2) {
trace(0, 2) {
producerSpan(it, 0, jmsResourceName)
consumerSpan(it, 1, jmsResourceName, false, HornetQMessageConsumer, span(0))
}
trace(1, 1) {
consumerSpan(it, 0, jmsResourceName, false, HornetQMessageConsumer, traces[0][0])
}
trace(2, 1) {
trace(1, 2) {
producerSpan(it, 0, "Temporary Queue") // receive doesn't propagate the trace, so this is a root
}
trace(3, 1) {
consumerSpan(it, 0, "Temporary Queue", false, HornetQMessageConsumer, traces[2][0])
consumerSpan(it, 1, "Temporary Queue", false, HornetQMessageConsumer, span(0))
}
}

View File

@ -30,6 +30,7 @@ import com.google.auto.service.AutoService;
import io.opentelemetry.auto.tooling.Instrumenter;
import io.opentelemetry.context.Scope;
import io.opentelemetry.trace.Span;
import io.opentelemetry.trace.SpanContext;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
@ -98,11 +99,19 @@ public final class JMSMessageConsumerInstrumentation extends Instrumenter.Defaul
.setSpanKind(CLIENT)
.setStartTimestamp(TimeUnit.MILLISECONDS.toNanos(startTime));
if (message != null) {
SpanContext spanContext = null;
try {
spanBuilder.addLink(TRACER.getHttpTextFormat().extract(message, GETTER));
spanContext = TRACER.getHttpTextFormat().extract(message, GETTER);
} catch (final IllegalArgumentException e) {
// Couldn't extract a context
}
if (spanContext != null) {
if (TRACER.getCurrentSpan().getContext().isValid()) {
spanBuilder.addLink(spanContext);
} else {
spanBuilder.setParent(spanContext);
}
}
}
final Span span = spanBuilder.startSpan();
span.setAttribute("span.origin.type", consumer.getClass().getName());

View File

@ -34,6 +34,8 @@ import javax.jms.TextMessage
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicReference
import static io.opentelemetry.auto.test.utils.TraceUtils.basicSpan
import static io.opentelemetry.auto.test.utils.TraceUtils.runUnderTrace
import static io.opentelemetry.trace.Span.Kind.CLIENT
import static io.opentelemetry.trace.Span.Kind.CONSUMER
import static io.opentelemetry.trace.Span.Kind.PRODUCER
@ -70,14 +72,47 @@ class JMS1Test extends AgentTestRunner {
TextMessage receivedMessage = consumer.receive()
expect:
receivedMessage.text == messageText
assertTraces(1) {
trace(0, 2) {
producerSpan(it, 0, jmsResourceName)
consumerSpan(it, 1, jmsResourceName, false, ActiveMQMessageConsumer, span(0))
}
}
cleanup:
producer.close()
consumer.close()
where:
destination | jmsResourceName
session.createQueue("someQueue") | "Queue someQueue"
session.createTopic("someTopic") | "Topic someTopic"
session.createTemporaryQueue() | "Temporary Queue"
session.createTemporaryTopic() | "Temporary Topic"
}
def "sending a message to #jmsResourceName and receive under existing parent generates link"() {
setup:
def producer = session.createProducer(destination)
def consumer = session.createConsumer(destination)
producer.send(message)
TextMessage receivedMessage = runUnderTrace("parent") {
consumer.receive()
}
expect:
receivedMessage.text == messageText
assertTraces(2) {
trace(0, 1) {
producerSpan(it, 0, jmsResourceName)
}
trace(1, 1) {
consumerSpan(it, 0, jmsResourceName, false, ActiveMQMessageConsumer, traces[0][0])
trace(1, 2) {
basicSpan(it, 0, "parent")
consumerSpan(it, 1, jmsResourceName, false, ActiveMQMessageConsumer, span(0), traces[0][0])
}
}
@ -274,17 +309,18 @@ class JMS1Test extends AgentTestRunner {
}
}
static consumerSpan(TraceAssert trace, int index, String jmsResourceName, boolean messageListener, Class origin, Object parentOrLinkedSpan) {
static consumerSpan(TraceAssert trace, int index, String jmsResourceName, boolean messageListener, Class origin, Object parentSpan, Object linkSpan = null) {
trace.span(index) {
if (messageListener) {
operationName "jms.onMessage"
spanKind CONSUMER
childOf((SpanData) parentOrLinkedSpan)
} else {
operationName "jms.consume"
spanKind CLIENT
parent()
hasLink((SpanData) parentOrLinkedSpan)
}
childOf((SpanData) parentSpan)
if (linkSpan) {
hasLink((SpanData) linkSpan)
}
errored false
tags {

View File

@ -28,6 +28,8 @@ import java.util.concurrent.TimeUnit
import static JMS1Test.consumerSpan
import static JMS1Test.producerSpan
import static io.opentelemetry.auto.test.utils.TraceUtils.basicSpan
import static io.opentelemetry.auto.test.utils.TraceUtils.runUnderTrace
class SpringTemplateJMS1Test extends AgentTestRunner {
@Shared
@ -61,14 +63,36 @@ class SpringTemplateJMS1Test extends AgentTestRunner {
template.convertAndSend(destination, messageText)
TextMessage receivedMessage = template.receive(destination)
expect:
receivedMessage.text == messageText
assertTraces(1) {
trace(0, 2) {
producerSpan(it, 0, jmsResourceName)
consumerSpan(it, 1, jmsResourceName, false, ActiveMQMessageConsumer, span(0))
}
}
where:
destination | jmsResourceName
session.createQueue("someSpringQueue") | "Queue someSpringQueue"
}
def "sending a message to #jmsResourceName and receive under existing parent generates link"() {
setup:
template.convertAndSend(destination, messageText)
TextMessage receivedMessage = runUnderTrace("parent") {
template.receive(destination)
}
expect:
receivedMessage.text == messageText
assertTraces(2) {
trace(0, 1) {
producerSpan(it, 0, jmsResourceName)
}
trace(1, 1) {
consumerSpan(it, 0, jmsResourceName, false, ActiveMQMessageConsumer, traces[0][0])
trace(1, 2) {
basicSpan(it, 0, "parent")
consumerSpan(it, 1, jmsResourceName, false, ActiveMQMessageConsumer, span(0), traces[0][0])
}
}
@ -98,18 +122,14 @@ class SpringTemplateJMS1Test extends AgentTestRunner {
expect:
receivedMessage.text == "responded!"
assertTraces(4) {
trace(0, 1) {
assertTraces(2) {
trace(0, 2) {
producerSpan(it, 0, jmsResourceName)
consumerSpan(it, 1, jmsResourceName, false, ActiveMQMessageConsumer, span(0))
}
trace(1, 1) {
consumerSpan(it, 0, jmsResourceName, false, ActiveMQMessageConsumer, traces[0][0])
}
trace(2, 1) {
trace(1, 2) {
producerSpan(it, 0, "Temporary Queue") // receive doesn't propagate the trace, so this is a root
}
trace(3, 1) {
consumerSpan(it, 0, "Temporary Queue", false, ActiveMQMessageConsumer, traces[2][0])
consumerSpan(it, 1, "Temporary Queue", false, ActiveMQMessageConsumer, span(0))
}
}

View File

@ -21,6 +21,7 @@ import static io.opentelemetry.trace.Span.Kind.CONSUMER;
import io.opentelemetry.auto.instrumentation.api.SpanWithScope;
import io.opentelemetry.trace.Span;
import io.opentelemetry.trace.SpanContext;
import java.util.Iterator;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@ -70,11 +71,19 @@ public class TracingIterator implements Iterator<ConsumerRecord> {
try {
if (next != null) {
final Span.Builder spanBuilder = TRACER.spanBuilder(operationName).setSpanKind(CONSUMER);
SpanContext spanContext = null;
try {
spanBuilder.addLink(TRACER.getHttpTextFormat().extract(next.headers(), GETTER));
spanContext = TRACER.getHttpTextFormat().extract(next.headers(), GETTER);
} catch (final IllegalArgumentException e) {
// Couldn't extract a context
}
if (spanContext != null) {
if (TRACER.getCurrentSpan().getContext().isValid()) {
spanBuilder.addLink(spanContext);
} else {
spanBuilder.setParent(spanContext);
}
}
final Span span = spanBuilder.startSpan();
decorator.afterStart(span);
decorator.onConsume(span, next);

View File

@ -96,8 +96,8 @@ class KafkaClientTest extends AgentTestRunner {
received.value() == greeting
received.key() == null
assertTraces(2) {
trace(0, 1) {
assertTraces(1) {
trace(0, 2) {
span(0) {
operationName "kafka.produce"
spanKind PRODUCER
@ -110,13 +110,11 @@ class KafkaClientTest extends AgentTestRunner {
"$Tags.COMPONENT" "java-kafka"
}
}
}
trace(1, 1) {
span(0) {
span(1) {
operationName "kafka.consume"
spanKind CONSUMER
errored false
hasLink traces[0][0]
childOf span(0)
tags {
"$MoreTags.SERVICE_NAME" "kafka"
"$MoreTags.RESOURCE_NAME" "Consume Topic $SHARED_TOPIC"
@ -169,8 +167,8 @@ class KafkaClientTest extends AgentTestRunner {
first.value() == greeting
first.key() == null
assertTraces(2) {
trace(0, 1) {
assertTraces(1) {
trace(0, 2) {
span(0) {
operationName "kafka.produce"
spanKind PRODUCER
@ -184,13 +182,11 @@ class KafkaClientTest extends AgentTestRunner {
"kafka.partition" { it >= 0 }
}
}
}
trace(1, 1) {
span(0) {
span(1) {
operationName "kafka.consume"
spanKind CONSUMER
errored false
hasLink traces[0][0]
childOf span(0)
tags {
"$MoreTags.SERVICE_NAME" "kafka"
"$MoreTags.RESOURCE_NAME" "Consume Topic $SHARED_TOPIC"

View File

@ -134,8 +134,8 @@ class KafkaStreamsTest extends AgentTestRunner {
received.value() == greeting.toLowerCase()
received.key() == null
assertTraces(3) {
trace(0, 3) {
assertTraces(1) {
trace(0, 5) {
// PRODUCER span 0
span(0) {
operationName "kafka.produce"
@ -149,8 +149,23 @@ class KafkaStreamsTest extends AgentTestRunner {
"$Tags.COMPONENT" "java-kafka"
}
}
// STREAMING span 1
// CONSUMER span 0
span(1) {
operationName "kafka.consume"
spanKind CONSUMER
errored false
childOf span(0)
tags {
"$MoreTags.SERVICE_NAME" "kafka"
"$MoreTags.RESOURCE_NAME" "Consume Topic $STREAM_PENDING"
"$MoreTags.SPAN_TYPE" "queue"
"$Tags.COMPONENT" "java-kafka"
"partition" { it >= 0 }
"offset" 0
}
}
// STREAMING span 1
span(2) {
operationName "kafka.consume"
spanKind CONSUMER
errored false
@ -166,11 +181,11 @@ class KafkaStreamsTest extends AgentTestRunner {
}
}
// STREAMING span 0
span(2) {
span(3) {
operationName "kafka.produce"
spanKind PRODUCER
errored false
childOf span(1)
childOf span(2)
tags {
"$MoreTags.SERVICE_NAME" "kafka"
"$MoreTags.RESOURCE_NAME" "Produce Topic $STREAM_PROCESSED"
@ -178,31 +193,12 @@ class KafkaStreamsTest extends AgentTestRunner {
"$Tags.COMPONENT" "java-kafka"
}
}
}
trace(1, 1) {
// CONSUMER span 0
span(0) {
span(4) {
operationName "kafka.consume"
spanKind CONSUMER
errored false
hasLink traces[0][0]
tags {
"$MoreTags.SERVICE_NAME" "kafka"
"$MoreTags.RESOURCE_NAME" "Consume Topic $STREAM_PENDING"
"$MoreTags.SPAN_TYPE" "queue"
"$Tags.COMPONENT" "java-kafka"
"partition" { it >= 0 }
"offset" 0
}
}
}
trace(2, 1) {
// CONSUMER span 0
span(0) {
operationName "kafka.consume"
spanKind CONSUMER
errored false
hasLink traces[0][2]
childOf span(3)
tags {
"$MoreTags.SERVICE_NAME" "kafka"
"$MoreTags.RESOURCE_NAME" "Consume Topic $STREAM_PROCESSED"
@ -228,8 +224,8 @@ class KafkaStreamsTest extends AgentTestRunner {
return null
}
})
spanContext.traceId == TEST_WRITER.traces[0][2].traceId
spanContext.spanId == TEST_WRITER.traces[0][2].spanId
spanContext.traceId == TEST_WRITER.traces[0][3].traceId
spanContext.spanId == TEST_WRITER.traces[0][3].spanId
cleanup:

View File

@ -51,6 +51,7 @@ import io.opentelemetry.auto.instrumentation.api.Tags;
import io.opentelemetry.auto.tooling.Instrumenter;
import io.opentelemetry.context.Scope;
import io.opentelemetry.trace.Span;
import io.opentelemetry.trace.SpanContext;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedHashMap;
@ -250,11 +251,19 @@ public class RabbitChannelInstrumentation extends Instrumenter.Default {
final Map<String, Object> headers = response.getProps().getHeaders();
if (headers != null) {
SpanContext spanContext = null;
try {
spanBuilder.addLink(TRACER.getHttpTextFormat().extract(headers, GETTER));
spanContext = TRACER.getHttpTextFormat().extract(headers, GETTER);
} catch (final IllegalArgumentException e) {
// couldn't extract a context
}
if (spanContext != null) {
if (TRACER.getCurrentSpan().getContext().isValid()) {
spanBuilder.addLink(spanContext);
} else {
spanBuilder.setParent(spanContext);
}
}
}
}

View File

@ -126,11 +126,11 @@ class RabbitMQTest extends AgentTestRunner {
tags {
}
}
rabbitSpan(it, 1, "exchange.declare", false, span(0))
rabbitSpan(it, 2, "queue.declare", false, span(0))
rabbitSpan(it, 3, "queue.bind", false, span(0))
rabbitSpan(it, 4, "basic.publish $exchangeName -> $routingKey", false, span(0))
rabbitSpan(it, 5, "basic.get <generated>", true, span(4))
rabbitSpan(it, 1, "exchange.declare", span(0))
rabbitSpan(it, 2, "queue.declare", span(0))
rabbitSpan(it, 3, "queue.bind", span(0))
rabbitSpan(it, 4, "basic.publish $exchangeName -> $routingKey", span(0))
rabbitSpan(it, 5, "basic.get <generated>", span(0), span(4))
}
}
@ -151,15 +151,13 @@ class RabbitMQTest extends AgentTestRunner {
new String(response.getBody()) == "Hello, world!"
and:
assertTraces(3) {
assertTraces(2) {
trace(0, 1) {
rabbitSpan(it, 0, "queue.declare")
}
trace(1, 1) {
trace(1, 2) {
rabbitSpan(it, 0, "basic.publish <default> -> <generated>")
}
trace(2, 1) {
rabbitSpan(it, 0, "basic.get <generated>", true, traces[1][0])
rabbitSpan(it, 1, "basic.get <generated>", span(0))
}
}
}
@ -205,7 +203,7 @@ class RabbitMQTest extends AgentTestRunner {
(1..messageCount).each {
trace(3 + it, 2) {
rabbitSpan(it, 0, "basic.publish $exchangeName -> <all>")
rabbitSpan(it, 1, resource, true, span(0))
rabbitSpan(it, 1, resource, span(0))
}
}
}
@ -252,7 +250,7 @@ class RabbitMQTest extends AgentTestRunner {
}
trace(4, 2) {
rabbitSpan(it, 0, "basic.publish $exchangeName -> <all>")
rabbitSpan(it, 1, "basic.deliver <generated>", true, span(0), error, error.message)
rabbitSpan(it, 1, "basic.deliver <generated>", span(0), null, error, error.message)
}
}
@ -271,7 +269,7 @@ class RabbitMQTest extends AgentTestRunner {
assertTraces(1) {
trace(0, 1) {
rabbitSpan(it, command, false, null, throwable, errorMsg)
rabbitSpan(it, command, null, null, throwable, errorMsg)
}
}
@ -302,15 +300,13 @@ class RabbitMQTest extends AgentTestRunner {
message == "foo"
and:
assertTraces(3) {
assertTraces(2) {
trace(0, 1) {
rabbitSpan(it, "queue.declare")
}
trace(1, 1) {
trace(1, 2) {
rabbitSpan(it, 0, "basic.publish <default> -> some-routing-queue")
}
trace(2, 1) {
rabbitSpan(it, 0, "basic.get $queue.name", true, traces[1][0])
rabbitSpan(it, 1, "basic.get $queue.name", span(0))
}
}
}
@ -318,20 +314,20 @@ class RabbitMQTest extends AgentTestRunner {
def rabbitSpan(
TraceAssert trace,
String resource,
Boolean distributedRootSpan = false,
Object parentSpan = null,
Object linkSpan = null,
Throwable exception = null,
String errorMsg = null
) {
rabbitSpan(trace, 0, resource, distributedRootSpan, parentSpan, exception, errorMsg)
rabbitSpan(trace, 0, resource, parentSpan, linkSpan, exception, errorMsg)
}
def rabbitSpan(
TraceAssert trace,
int index,
String resource,
Boolean distributedRootSpan = false,
Object parentOrLinkSpan = null,
Object parentSpan = null,
Object linkSpan = null,
Throwable exception = null,
String errorMsg = null
) {
@ -352,16 +348,16 @@ class RabbitMQTest extends AgentTestRunner {
spanKind CLIENT
}
if (parentOrLinkSpan) {
if (trace.span(index).attributes.get("amqp.command")?.stringValue == "basic.get") {
hasLink((SpanData) parentOrLinkSpan)
} else {
childOf((SpanData) parentOrLinkSpan)
}
if (parentSpan) {
childOf((SpanData) parentSpan)
} else {
parent()
}
if (linkSpan) {
hasLink((SpanData) linkSpan)
}
errored exception != null
tags {