Link RabbitMQ receive span with the producer span (#6808)

Similar to #6804, but for RabbitMQ.
Also changed the span kind of the receive span to `CONSUMER`, to match
the spec.
This commit is contained in:
Mateusz Rzeszutek 2022-10-06 11:20:19 +02:00 committed by GitHub
parent 6f6af66c04
commit ab0c875015
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 156 additions and 92 deletions

View File

@ -32,5 +32,7 @@ dependencies {
tasks.withType<Test>().configureEach {
// TODO run tests both with and without experimental span attributes
jvmArgs("-Dotel.instrumentation.rabbitmq.experimental-span-attributes=true")
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].getService())
}

View File

@ -18,6 +18,7 @@ import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperat
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.instrumentation.api.instrumenter.net.NetClientAttributesExtractor;
import io.opentelemetry.instrumentation.api.internal.PropagatorBasedSpanLinksExtractor;
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig;
import java.util.ArrayList;
@ -29,18 +30,15 @@ public final class RabbitSingletons {
InstrumentationConfig.get()
.getBoolean("otel.instrumentation.rabbitmq.experimental-span-attributes", false);
private static final String instrumentationName = "io.opentelemetry.rabbitmq-2.7";
private static final Instrumenter<ChannelAndMethod, Void> channelInstrumenter;
private static final Instrumenter<ReceiveRequest, GetResponse> receiveInstrumenter;
private static final Instrumenter<DeliveryRequest, Void> deliverInstrumenter;
private static final Instrumenter<ChannelAndMethod, Void> channelInstrumenter =
createChannelInstrumenter();
private static final Instrumenter<ReceiveRequest, GetResponse> receiveInstrumenter =
createReceiveInstrumenter();
private static final Instrumenter<DeliveryRequest, Void> deliverInstrumenter =
createDeliverInstrumenter();
static final ContextKey<RabbitChannelAndMethodHolder> CHANNEL_AND_METHOD_CONTEXT_KEY =
ContextKey.named("opentelemetry-rabbitmq-channel-and-method-context-key");
static {
channelInstrumenter = createChannelInstrumenter();
receiveInstrumenter = createReceiveInstrumenter();
deliverInstrumenter = createDeliverInstrumenter();
}
public static Instrumenter<ChannelAndMethod, Void> channelInstrumenter() {
return channelInstrumenter;
}
@ -82,7 +80,12 @@ public final class RabbitSingletons {
return Instrumenter.<ReceiveRequest, GetResponse>builder(
GlobalOpenTelemetry.get(), instrumentationName, ReceiveRequest::spanName)
.addAttributesExtractors(extractors)
.buildInstrumenter(SpanKindExtractor.alwaysClient());
.setEnabled(ExperimentalConfig.get().messagingReceiveInstrumentationEnabled())
.addSpanLinksExtractor(
new PropagatorBasedSpanLinksExtractor<>(
GlobalOpenTelemetry.getPropagators().getTextMapPropagator(),
ReceiveRequestTextMapGetter.INSTANCE))
.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
}
private static Instrumenter<DeliveryRequest, Void> createDeliverInstrumenter() {

View File

@ -0,0 +1,40 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.rabbitmq;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.GetResponse;
import io.opentelemetry.context.propagation.TextMapGetter;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
enum ReceiveRequestTextMapGetter implements TextMapGetter<ReceiveRequest> {
INSTANCE;
@Override
public Iterable<String> keys(ReceiveRequest carrier) {
return Optional.of(carrier)
.map(ReceiveRequest::getResponse)
.map(GetResponse::getProps)
.map(AMQP.BasicProperties::getHeaders)
.map(Map::keySet)
.orElse(Collections.emptySet());
}
@Nullable
@Override
public String get(@Nullable ReceiveRequest carrier, String key) {
return Optional.ofNullable(carrier)
.map(ReceiveRequest::getResponse)
.map(GetResponse::getProps)
.map(AMQP.BasicProperties::getHeaders)
.map(headers -> headers.get(key))
.map(Object::toString)
.orElse(null);
}
}

View File

@ -53,11 +53,14 @@ class RabbitMqTest extends AgentInstrumentationSpecification implements WithRabb
def "test rabbit publish/get"() {
setup:
GetResponse response = runWithSpan("parent") {
String queueName = runWithSpan("producer parent") {
channel.exchangeDeclare(exchangeName, "direct", false)
String queueName = channel.queueDeclare().getQueue()
channel.queueBind(queueName, exchangeName, routingKey)
channel.basicPublish(exchangeName, routingKey, null, "Hello, world!".getBytes())
return queueName
}
GetResponse response = runWithSpan("consumer parent") {
return channel.basicGet(queueName, true)
}
@ -65,18 +68,28 @@ class RabbitMqTest extends AgentInstrumentationSpecification implements WithRabb
new String(response.getBody()) == "Hello, world!"
and:
assertTraces(1) {
trace(0, 6) {
assertTraces(2) {
SpanData producerSpan
trace(0, 5) {
span(0) {
name "parent"
attributes {
}
name "producer parent"
hasNoParent()
}
rabbitSpan(it, 1, null, null, null, "exchange.declare", span(0))
rabbitSpan(it, 2, null, null, null, "queue.declare", span(0))
rabbitSpan(it, 3, null, null, null, "queue.bind", span(0))
rabbitSpan(it, 4, exchangeName, routingKey, "send", "$exchangeName", span(0))
rabbitSpan(it, 5, exchangeName, routingKey, "receive", "<generated>", span(0))
producerSpan = span(4)
}
trace(1, 2) {
span(0) {
name "consumer parent"
hasNoParent()
}
rabbitSpan(it, 1, exchangeName, routingKey, "receive", "<generated>", span(0), producerSpan)
}
}
@ -87,24 +100,39 @@ class RabbitMqTest extends AgentInstrumentationSpecification implements WithRabb
def "test rabbit publish/get default exchange"() {
setup:
String queueName = channel.queueDeclare().getQueue()
channel.basicPublish("", queueName, null, "Hello, world!".getBytes())
GetResponse response = channel.basicGet(queueName, true)
String queueName = runWithSpan("producer parent") {
String queueName = channel.queueDeclare().getQueue()
channel.basicPublish("", queueName, null, "Hello, world!".getBytes())
return queueName
}
GetResponse response = runWithSpan("consumer parent") {
return channel.basicGet(queueName, true)
}
expect:
new String(response.getBody()) == "Hello, world!"
and:
assertTraces(3) {
traces.subList(1, 3).sort(orderByRootSpanKind(PRODUCER, CLIENT))
trace(0, 1) {
rabbitSpan(it, 0, null, null, null, "queue.declare")
assertTraces(2) {
SpanData producerSpan
trace(0, 3) {
span(0) {
name "producer parent"
hasNoParent()
}
rabbitSpan(it, 1, null, null, null, "queue.declare", span(0))
rabbitSpan(it, 2, "<default>", null, "send", "<default>", span(0))
producerSpan = span(2)
}
trace(1, 1) {
rabbitSpan(it, 0, "<default>", null, "send", "<default>")
}
trace(2, 1) {
rabbitSpan(it, 0, "<default>", null, "receive", "<generated>", null)
trace(1, 2) {
span(0) {
name "consumer parent"
hasNoParent()
}
rabbitSpan(it, 1, "<default>", null, "receive", "<generated>", span(0), producerSpan)
}
}
}
@ -142,16 +170,16 @@ class RabbitMqTest extends AgentInstrumentationSpecification implements WithRabb
expect:
assertTraces(4 + messageCount) {
trace(0, 1) {
rabbitSpan(it, null, null, null, "exchange.declare")
rabbitSpan(it, 0, null, null, null, "exchange.declare")
}
trace(1, 1) {
rabbitSpan(it, null, null, null, "queue.declare")
rabbitSpan(it, 0, null, null, null, "queue.declare")
}
trace(2, 1) {
rabbitSpan(it, null, null, null, "queue.bind")
rabbitSpan(it, 0, null, null, null, "queue.bind")
}
trace(3, 1) {
rabbitSpan(it, null, null, null, "basic.consume")
rabbitSpan(it, 0, null, null, null, "basic.consume")
}
(1..messageCount).each {
trace(3 + it, 2) {
@ -197,16 +225,16 @@ class RabbitMqTest extends AgentInstrumentationSpecification implements WithRabb
expect:
assertTraces(5) {
trace(0, 1) {
rabbitSpan(it, null, null, null, "exchange.declare")
rabbitSpan(it, 0, null, null, null, "exchange.declare")
}
trace(1, 1) {
rabbitSpan(it, null, null, null, "queue.declare")
rabbitSpan(it, 0, null, null, null, "queue.declare")
}
trace(2, 1) {
rabbitSpan(it, null, null, null, "queue.bind")
rabbitSpan(it, 0, null, null, null, "queue.bind")
}
trace(3, 1) {
rabbitSpan(it, null, null, null, "basic.consume")
rabbitSpan(it, 0, null, null, null, "basic.consume")
}
trace(4, 2) {
rabbitSpan(it, 0, exchangeName, null, "send", "$exchangeName")
@ -229,7 +257,7 @@ class RabbitMqTest extends AgentInstrumentationSpecification implements WithRabb
assertTraces(1) {
trace(0, 1) {
rabbitSpan(it, null, null, operation, command, null, null, error, errorMsg)
rabbitSpan(it, 0, null, null, operation, command, null, null, error, errorMsg)
}
}
@ -250,26 +278,41 @@ class RabbitMqTest extends AgentInstrumentationSpecification implements WithRabb
setup:
def connectionFactory = new CachingConnectionFactory(connectionFactory)
AmqpAdmin admin = new RabbitAdmin(connectionFactory)
def queue = new Queue("some-routing-queue", false, true, true, null)
admin.declareQueue(queue)
AmqpTemplate template = new RabbitTemplate(connectionFactory)
template.convertAndSend(queue.name, "foo")
String message = (String) template.receiveAndConvert(queue.name)
def queue = new Queue("some-routing-queue", false, true, true, null)
runWithSpan("producer parent") {
admin.declareQueue(queue)
template.convertAndSend(queue.name, "foo")
}
String message = runWithSpan("consumer parent") {
return template.receiveAndConvert(queue.name) as String
}
expect:
message == "foo"
and:
assertTraces(3) {
traces.subList(1, 3).sort(orderByRootSpanKind(PRODUCER, CLIENT))
trace(0, 1) {
rabbitSpan(it, null, null, null, "queue.declare")
assertTraces(2) {
SpanData producerSpan
trace(0, 3) {
span(0) {
name "producer parent"
hasNoParent()
}
rabbitSpan(it, 1, null, null, null, "queue.declare", span(0))
rabbitSpan(it, 2, "<default>", "some-routing-queue", "send", "<default>", span(0))
producerSpan = span(2)
}
trace(1, 1) {
rabbitSpan(it, 0, "<default>", "some-routing-queue", "send", "<default>")
}
trace(2, 1) {
rabbitSpan(it, 0, "<default>", "some-routing-queue", "receive", queue.name, null)
trace(1, 2) {
span(0) {
name "consumer parent"
hasNoParent()
}
rabbitSpan(it, 1, "<default>", "some-routing-queue", "receive", queue.name, span(0), producerSpan)
}
}
}
@ -303,7 +346,7 @@ class RabbitMqTest extends AgentInstrumentationSpecification implements WithRabb
rabbitSpan(it, 0, null, null, null, "queue.declare")
}
trace(1, 2) {
rabbitSpan(it, 0, "<default>", null, "send", "<default>", true)
rabbitSpan(it, 0, "<default>", null, "send", "<default>", null, null, null, null, false, true)
rabbitSpan(it, 1, "<default>", null, "process", "<generated>", span(0), null, null, null, false, true)
}
trace(2, 1) {
@ -312,21 +355,6 @@ class RabbitMqTest extends AgentInstrumentationSpecification implements WithRabb
}
}
def rabbitSpan(
TraceAssert trace,
String exchange,
String routingKey,
String operation,
String resource,
Object parentSpan = null,
Object linkSpan = null,
Throwable exception = null,
String errorMsg = null,
boolean expectTimestamp = false
) {
rabbitSpan(trace, 0, exchange, routingKey, operation, resource, parentSpan, linkSpan, exception, errorMsg, expectTimestamp)
}
def rabbitSpan(
TraceAssert trace,
int index,
@ -334,20 +362,8 @@ class RabbitMqTest extends AgentInstrumentationSpecification implements WithRabb
String routingKey,
String operation,
String resource,
boolean testHeaders
) {
rabbitSpan(trace, index, exchange, routingKey, operation, resource, null, null, null, null, false, testHeaders)
}
def rabbitSpan(
TraceAssert trace,
int index,
String exchange,
String routingKey,
String operation,
String resource,
Object parentSpan = null,
Object linkSpan = null,
SpanData parentSpan = null,
SpanData linkSpan = null,
Throwable exception = null,
String errorMsg = null,
boolean expectTimestamp = false,
@ -359,14 +375,14 @@ class RabbitMqTest extends AgentInstrumentationSpecification implements WithRabb
spanName = spanName + " " + operation
}
def rabbitCommand = trace.span(index).attributes.get(AttributeKey.stringKey("rabbitmq.command"))
def spanKind
switch (trace.span(index).attributes.get(AttributeKey.stringKey("rabbitmq.command"))) {
switch (rabbitCommand) {
case "basic.publish":
spanKind = PRODUCER
break
case "basic.get":
spanKind = CLIENT
break
case "basic.get": // fallthrough
case "basic.deliver":
spanKind = CONSUMER
break
@ -378,14 +394,16 @@ class RabbitMqTest extends AgentInstrumentationSpecification implements WithRabb
name spanName
kind spanKind
if (parentSpan) {
childOf((SpanData) parentSpan)
} else {
if (parentSpan == null) {
hasNoParent()
} else {
childOf(parentSpan)
}
if (linkSpan) {
hasLink((SpanData) linkSpan)
if (linkSpan == null) {
hasNoLinks()
} else {
hasLink(linkSpan)
}
if (exception) {
@ -394,7 +412,8 @@ class RabbitMqTest extends AgentInstrumentationSpecification implements WithRabb
}
attributes {
if (spanKind != CONSUMER) {
// listener does not have access to net attributes
if (rabbitCommand != "basic.deliver") {
"net.sock.peer.addr" { it == "127.0.0.1" || it == "0:0:0:0:0:0:0:1" || it == null }
"net.sock.peer.port" Long
"net.sock.family" { it == null || it == "inet6" }
@ -415,7 +434,7 @@ class RabbitMqTest extends AgentInstrumentationSpecification implements WithRabb
"messaging.header.test_message_header" { it == ["test"] }
}
switch (trace.span(index).attributes.get(AttributeKey.stringKey("rabbitmq.command"))) {
switch (rabbitCommand) {
case "basic.publish":
"rabbitmq.command" "basic.publish"
"$SemanticAttributes.MESSAGING_RABBITMQ_ROUTING_KEY" {