Add network attributes to rabbitmq process spans (#10210)

This commit is contained in:
Lauri Tulmin 2024-01-11 17:49:03 +02:00 committed by GitHub
parent 2aaa7b10d7
commit 6ce3bd8c14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 64 additions and 12 deletions

View File

@ -7,20 +7,27 @@ package io.opentelemetry.javaagent.instrumentation.rabbitmq;
import com.google.auto.value.AutoValue;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Envelope;
@AutoValue
abstract class DeliveryRequest {
static DeliveryRequest create(
String queue, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
return new AutoValue_DeliveryRequest(queue, envelope, properties, body);
String queue,
Envelope envelope,
Connection connection,
AMQP.BasicProperties properties,
byte[] body) {
return new AutoValue_DeliveryRequest(queue, envelope, connection, properties, body);
}
abstract String getQueue();
abstract Envelope getEnvelope();
abstract Connection getConnection();
abstract AMQP.BasicProperties getProperties();
@SuppressWarnings("mutable")

View File

@ -235,11 +235,12 @@ public class RabbitChannelInstrumentation implements TypeInstrumentation {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void wrapConsumer(
@Advice.This Channel channel,
@Advice.Argument(0) String queue,
@Advice.Argument(value = 6, readOnly = false) Consumer consumer) {
// We have to save off the queue name here because it isn't available to the consumer later.
if (consumer != null && !(consumer instanceof TracedDelegatingConsumer)) {
consumer = new TracedDelegatingConsumer(queue, consumer);
consumer = new TracedDelegatingConsumer(queue, consumer, channel.getConnection());
}
}
}

View File

@ -0,0 +1,40 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.rabbitmq;
import io.opentelemetry.instrumentation.api.semconv.network.NetworkAttributesGetter;
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.InetAddress;
import javax.annotation.Nullable;
public class RabbitDeliveryNetAttributesGetter
implements NetworkAttributesGetter<DeliveryRequest, Void> {
@Nullable
@Override
public String getNetworkType(DeliveryRequest request, @Nullable Void response) {
InetAddress address = request.getConnection().getAddress();
if (address instanceof Inet4Address) {
return "ipv4";
} else if (address instanceof Inet6Address) {
return "ipv6";
}
return null;
}
@Nullable
@Override
public String getNetworkPeerAddress(DeliveryRequest request, @Nullable Void response) {
return request.getConnection().getAddress().getHostAddress();
}
@Nullable
@Override
public Integer getNetworkPeerPort(DeliveryRequest request, @Nullable Void response) {
return request.getConnection().getPort();
}
}

View File

@ -20,7 +20,7 @@ public class RabbitReceiveNetAttributesGetter
public String getNetworkType(ReceiveRequest request, @Nullable GetResponse response) {
InetAddress address = request.getConnection().getAddress();
if (address instanceof Inet4Address) {
return "ipv6";
return "ipv4";
} else if (address instanceof Inet6Address) {
return "ipv6";
}

View File

@ -96,6 +96,7 @@ public final class RabbitSingletons {
extractors.add(
buildMessagingAttributesExtractor(
RabbitDeliveryAttributesGetter.INSTANCE, MessageOperation.PROCESS));
extractors.add(NetworkAttributesExtractor.create(new RabbitDeliveryNetAttributesGetter()));
extractors.add(new RabbitDeliveryExtraAttributesExtractor());
if (CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) {
extractors.add(new RabbitDeliveryExperimentalAttributesExtractor());

View File

@ -8,6 +8,7 @@ package io.opentelemetry.javaagent.instrumentation.rabbitmq;
import static io.opentelemetry.javaagent.instrumentation.rabbitmq.RabbitSingletons.deliverInstrumenter;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
@ -23,10 +24,12 @@ public class TracedDelegatingConsumer implements Consumer {
private final String queue;
private final Consumer delegate;
private final Connection connection;
public TracedDelegatingConsumer(String queue, Consumer delegate) {
public TracedDelegatingConsumer(String queue, Consumer delegate, Connection connection) {
this.queue = queue;
this.delegate = delegate;
this.connection = connection;
}
@Override
@ -59,7 +62,7 @@ public class TracedDelegatingConsumer implements Consumer {
String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
Context parentContext = Context.current();
DeliveryRequest request = DeliveryRequest.create(queue, envelope, properties, body);
DeliveryRequest request = DeliveryRequest.create(queue, envelope, connection, properties, body);
if (!deliverInstrumenter().shouldStart(parentContext, request)) {
delegate.handleDelivery(consumerTag, envelope, properties, body);

View File

@ -760,11 +760,7 @@ class RabbitMqTest extends AbstractRabbitMqTest {
verifyException(span, exception, errorMsg);
}
// listener does not have access to net attributes
if (!"basic.deliver".equals(rabbitCommand)) {
verifyNetAttributes(span);
}
verifyNetAttributes(span);
verifyMessagingAttributes(span, exchange, routingKey, operation);
if (expectTimestamp) {

View File

@ -74,6 +74,9 @@ class SpringIntegrationAndRabbitTest extends AgentInstrumentationSpecification i
childOf span(3)
kind CONSUMER
attributes {
"$NetworkAttributes.NETWORK_PEER_ADDRESS" { it == "127.0.0.1" || it == "0:0:0:0:0:0:0:1" || it == null }
"$NetworkAttributes.NETWORK_PEER_PORT" Long
"$SemanticAttributes.NETWORK_TYPE" { it == "ipv4" || it == "ipv6" || it == null }
"$SemanticAttributes.MESSAGING_SYSTEM" "rabbitmq"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testTopic"
"$SemanticAttributes.MESSAGING_OPERATION" "process"

View File

@ -172,7 +172,8 @@ public class ContextPropagationTest {
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(1))
.hasAttributesSatisfyingExactly(
getAssertions("<default>", "process", null, true, testHeaders)),
getAssertions(
"<default>", "process", "127.0.0.1", true, testHeaders)),
// created by spring-rabbit instrumentation
span ->
span.hasName("testQueue process")