From 6ce3bd8c14939e701a8b0136fc0b9e0a5d03c20e Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Thu, 11 Jan 2024 17:49:03 +0200 Subject: [PATCH] Add network attributes to rabbitmq process spans (#10210) --- .../rabbitmq/DeliveryRequest.java | 11 ++++- .../RabbitChannelInstrumentation.java | 3 +- .../RabbitDeliveryNetAttributesGetter.java | 40 +++++++++++++++++++ .../RabbitReceiveNetAttributesGetter.java | 2 +- .../rabbitmq/RabbitSingletons.java | 1 + .../rabbitmq/TracedDelegatingConsumer.java | 7 +++- .../rabbitmq/RabbitMqTest.java | 6 +-- .../SpringIntegrationAndRabbitTest.groovy | 3 ++ .../rabbit/v1_0/ContextPropagationTest.java | 3 +- 9 files changed, 64 insertions(+), 12 deletions(-) create mode 100644 instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitDeliveryNetAttributesGetter.java diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/DeliveryRequest.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/DeliveryRequest.java index d8e8a73517..afe8b6d10c 100644 --- a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/DeliveryRequest.java +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/DeliveryRequest.java @@ -7,20 +7,27 @@ package io.opentelemetry.javaagent.instrumentation.rabbitmq; import com.google.auto.value.AutoValue; import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Connection; import com.rabbitmq.client.Envelope; @AutoValue abstract class DeliveryRequest { static DeliveryRequest create( - String queue, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { - return new AutoValue_DeliveryRequest(queue, envelope, properties, body); + String queue, + Envelope envelope, + Connection connection, + AMQP.BasicProperties properties, + byte[] body) { + return new AutoValue_DeliveryRequest(queue, envelope, connection, properties, body); } abstract String getQueue(); abstract Envelope getEnvelope(); + abstract Connection getConnection(); + abstract AMQP.BasicProperties getProperties(); @SuppressWarnings("mutable") diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitChannelInstrumentation.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitChannelInstrumentation.java index 189c587bbc..8f49106d40 100644 --- a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitChannelInstrumentation.java +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitChannelInstrumentation.java @@ -235,11 +235,12 @@ public class RabbitChannelInstrumentation implements TypeInstrumentation { @Advice.OnMethodEnter(suppress = Throwable.class) public static void wrapConsumer( + @Advice.This Channel channel, @Advice.Argument(0) String queue, @Advice.Argument(value = 6, readOnly = false) Consumer consumer) { // We have to save off the queue name here because it isn't available to the consumer later. if (consumer != null && !(consumer instanceof TracedDelegatingConsumer)) { - consumer = new TracedDelegatingConsumer(queue, consumer); + consumer = new TracedDelegatingConsumer(queue, consumer, channel.getConnection()); } } } diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitDeliveryNetAttributesGetter.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitDeliveryNetAttributesGetter.java new file mode 100644 index 0000000000..047c0ac18a --- /dev/null +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitDeliveryNetAttributesGetter.java @@ -0,0 +1,40 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rabbitmq; + +import io.opentelemetry.instrumentation.api.semconv.network.NetworkAttributesGetter; +import java.net.Inet4Address; +import java.net.Inet6Address; +import java.net.InetAddress; +import javax.annotation.Nullable; + +public class RabbitDeliveryNetAttributesGetter + implements NetworkAttributesGetter { + + @Nullable + @Override + public String getNetworkType(DeliveryRequest request, @Nullable Void response) { + InetAddress address = request.getConnection().getAddress(); + if (address instanceof Inet4Address) { + return "ipv4"; + } else if (address instanceof Inet6Address) { + return "ipv6"; + } + return null; + } + + @Nullable + @Override + public String getNetworkPeerAddress(DeliveryRequest request, @Nullable Void response) { + return request.getConnection().getAddress().getHostAddress(); + } + + @Nullable + @Override + public Integer getNetworkPeerPort(DeliveryRequest request, @Nullable Void response) { + return request.getConnection().getPort(); + } +} diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitReceiveNetAttributesGetter.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitReceiveNetAttributesGetter.java index 32841b8935..621d72a27e 100644 --- a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitReceiveNetAttributesGetter.java +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitReceiveNetAttributesGetter.java @@ -20,7 +20,7 @@ public class RabbitReceiveNetAttributesGetter public String getNetworkType(ReceiveRequest request, @Nullable GetResponse response) { InetAddress address = request.getConnection().getAddress(); if (address instanceof Inet4Address) { - return "ipv6"; + return "ipv4"; } else if (address instanceof Inet6Address) { return "ipv6"; } diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitSingletons.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitSingletons.java index cba13690a1..59490a7985 100644 --- a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitSingletons.java +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitSingletons.java @@ -96,6 +96,7 @@ public final class RabbitSingletons { extractors.add( buildMessagingAttributesExtractor( RabbitDeliveryAttributesGetter.INSTANCE, MessageOperation.PROCESS)); + extractors.add(NetworkAttributesExtractor.create(new RabbitDeliveryNetAttributesGetter())); extractors.add(new RabbitDeliveryExtraAttributesExtractor()); if (CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) { extractors.add(new RabbitDeliveryExperimentalAttributesExtractor()); diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/TracedDelegatingConsumer.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/TracedDelegatingConsumer.java index b94fb624d5..2737d0a947 100644 --- a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/TracedDelegatingConsumer.java +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/TracedDelegatingConsumer.java @@ -8,6 +8,7 @@ package io.opentelemetry.javaagent.instrumentation.rabbitmq; import static io.opentelemetry.javaagent.instrumentation.rabbitmq.RabbitSingletons.deliverInstrumenter; import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Connection; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.ShutdownSignalException; @@ -23,10 +24,12 @@ public class TracedDelegatingConsumer implements Consumer { private final String queue; private final Consumer delegate; + private final Connection connection; - public TracedDelegatingConsumer(String queue, Consumer delegate) { + public TracedDelegatingConsumer(String queue, Consumer delegate, Connection connection) { this.queue = queue; this.delegate = delegate; + this.connection = connection; } @Override @@ -59,7 +62,7 @@ public class TracedDelegatingConsumer implements Consumer { String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { Context parentContext = Context.current(); - DeliveryRequest request = DeliveryRequest.create(queue, envelope, properties, body); + DeliveryRequest request = DeliveryRequest.create(queue, envelope, connection, properties, body); if (!deliverInstrumenter().shouldStart(parentContext, request)) { delegate.handleDelivery(consumerTag, envelope, properties, body); diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitMqTest.java b/instrumentation/rabbitmq-2.7/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitMqTest.java index 02f2ca634d..ee71482307 100644 --- a/instrumentation/rabbitmq-2.7/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitMqTest.java +++ b/instrumentation/rabbitmq-2.7/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitMqTest.java @@ -760,11 +760,7 @@ class RabbitMqTest extends AbstractRabbitMqTest { verifyException(span, exception, errorMsg); } - // listener does not have access to net attributes - if (!"basic.deliver".equals(rabbitCommand)) { - verifyNetAttributes(span); - } - + verifyNetAttributes(span); verifyMessagingAttributes(span, exchange, routingKey, operation); if (expectTimestamp) { diff --git a/instrumentation/spring/spring-integration-4.1/javaagent/src/test/groovy/SpringIntegrationAndRabbitTest.groovy b/instrumentation/spring/spring-integration-4.1/javaagent/src/test/groovy/SpringIntegrationAndRabbitTest.groovy index 93a280c03b..cfc43e6f52 100644 --- a/instrumentation/spring/spring-integration-4.1/javaagent/src/test/groovy/SpringIntegrationAndRabbitTest.groovy +++ b/instrumentation/spring/spring-integration-4.1/javaagent/src/test/groovy/SpringIntegrationAndRabbitTest.groovy @@ -74,6 +74,9 @@ class SpringIntegrationAndRabbitTest extends AgentInstrumentationSpecification i childOf span(3) kind CONSUMER attributes { + "$NetworkAttributes.NETWORK_PEER_ADDRESS" { it == "127.0.0.1" || it == "0:0:0:0:0:0:0:1" || it == null } + "$NetworkAttributes.NETWORK_PEER_PORT" Long + "$SemanticAttributes.NETWORK_TYPE" { it == "ipv4" || it == "ipv6" || it == null } "$SemanticAttributes.MESSAGING_SYSTEM" "rabbitmq" "$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testTopic" "$SemanticAttributes.MESSAGING_OPERATION" "process" diff --git a/instrumentation/spring/spring-rabbit-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/rabbit/v1_0/ContextPropagationTest.java b/instrumentation/spring/spring-rabbit-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/rabbit/v1_0/ContextPropagationTest.java index 03392d1dc9..e2b026b558 100644 --- a/instrumentation/spring/spring-rabbit-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/rabbit/v1_0/ContextPropagationTest.java +++ b/instrumentation/spring/spring-rabbit-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/rabbit/v1_0/ContextPropagationTest.java @@ -172,7 +172,8 @@ public class ContextPropagationTest { .hasKind(SpanKind.CONSUMER) .hasParent(trace.getSpan(1)) .hasAttributesSatisfyingExactly( - getAssertions("", "process", null, true, testHeaders)), + getAssertions( + "", "process", "127.0.0.1", true, testHeaders)), // created by spring-rabbit instrumentation span -> span.hasName("testQueue process")