Better span names for rabbitmq-amqp (#273)
This commit is contained in:
parent
ef88d6f962
commit
a731e70810
|
@ -135,14 +135,13 @@ public class RabbitChannelInstrumentation extends Instrumenter.Default {
|
|||
|
||||
final Connection connection = channel.getConnection();
|
||||
|
||||
final Span.Builder spanBuilder = TRACER.spanBuilder("amqp.command");
|
||||
final Span.Builder spanBuilder = TRACER.spanBuilder(method);
|
||||
if (method.equals("Channel.basicPublish")) {
|
||||
spanBuilder.setSpanKind(PRODUCER);
|
||||
} else {
|
||||
spanBuilder.setSpanKind(CLIENT);
|
||||
}
|
||||
final Span span = spanBuilder.startSpan();
|
||||
span.setAttribute(MoreTags.RESOURCE_NAME, method);
|
||||
span.setAttribute(MoreTags.NET_PEER_PORT, connection.getPort());
|
||||
DECORATE.afterStart(span);
|
||||
DECORATE.onPeerConnection(span, connection.getAddress());
|
||||
|
@ -242,7 +241,7 @@ public class RabbitChannelInstrumentation extends Instrumenter.Default {
|
|||
// span creation
|
||||
final Span.Builder spanBuilder =
|
||||
TRACER
|
||||
.spanBuilder("amqp.command")
|
||||
.spanBuilder(DECORATE.spanNameOnGet(queue))
|
||||
.setSpanKind(CLIENT)
|
||||
.setStartTimestamp(TimeUnit.MILLISECONDS.toNanos(startTime));
|
||||
|
||||
|
|
|
@ -19,7 +19,6 @@ import com.rabbitmq.client.Command;
|
|||
import com.rabbitmq.client.Envelope;
|
||||
import io.opentelemetry.OpenTelemetry;
|
||||
import io.opentelemetry.auto.bootstrap.instrumentation.decorator.ClientDecorator;
|
||||
import io.opentelemetry.auto.instrumentation.api.MoreTags;
|
||||
import io.opentelemetry.trace.Span;
|
||||
import io.opentelemetry.trace.Tracer;
|
||||
|
||||
|
@ -46,7 +45,7 @@ public class RabbitDecorator extends ClientDecorator {
|
|||
routingKey == null || routingKey.isEmpty()
|
||||
? "<all>"
|
||||
: routingKey.startsWith("amq.gen-") ? "<generated>" : routingKey;
|
||||
span.setAttribute(MoreTags.RESOURCE_NAME, "basic.publish " + exchangeName + " -> " + routing);
|
||||
span.updateName(exchangeName + " -> " + routing);
|
||||
span.setAttribute("amqp.command", "basic.publish");
|
||||
if (exchange != null && !exchange.isEmpty()) {
|
||||
span.setAttribute("amqp.exchange", exchange);
|
||||
|
@ -56,22 +55,26 @@ public class RabbitDecorator extends ClientDecorator {
|
|||
}
|
||||
}
|
||||
|
||||
public void onGet(final Span span, final String queue) {
|
||||
final String queueName = queue.startsWith("amq.gen-") ? "<generated>" : queue;
|
||||
span.setAttribute(MoreTags.RESOURCE_NAME, "basic.get " + queueName);
|
||||
public String spanNameOnGet(final String queue) {
|
||||
return queue.startsWith("amq.gen-") ? "<generated>" : queue;
|
||||
}
|
||||
|
||||
public void onGet(final Span span, final String queue) {
|
||||
span.setAttribute("amqp.command", "basic.get");
|
||||
span.setAttribute("amqp.queue", queue);
|
||||
}
|
||||
|
||||
public void onDeliver(final Span span, final String queue, final Envelope envelope) {
|
||||
String queueName = queue;
|
||||
public String spanNameOnDeliver(final String queue) {
|
||||
if (queue == null || queue.isEmpty()) {
|
||||
queueName = "<default>";
|
||||
return "<default>";
|
||||
} else if (queue.startsWith("amq.gen-")) {
|
||||
queueName = "<generated>";
|
||||
return "<generated>";
|
||||
} else {
|
||||
return queue;
|
||||
}
|
||||
span.setAttribute(MoreTags.RESOURCE_NAME, "basic.deliver " + queueName);
|
||||
}
|
||||
|
||||
public void onDeliver(final Span span, final Envelope envelope) {
|
||||
span.setAttribute("amqp.command", "basic.deliver");
|
||||
|
||||
if (envelope != null) {
|
||||
|
@ -90,8 +93,7 @@ public class RabbitDecorator extends ClientDecorator {
|
|||
final String name = command.getMethod().protocolMethodName();
|
||||
|
||||
if (!name.equals("basic.publish")) {
|
||||
// Don't overwrite the name already set.
|
||||
span.setAttribute(MoreTags.RESOURCE_NAME, name);
|
||||
span.updateName(name);
|
||||
}
|
||||
span.setAttribute("amqp.command", name);
|
||||
}
|
||||
|
|
|
@ -81,7 +81,8 @@ public class TracedDelegatingConsumer implements Consumer {
|
|||
Scope scope = null;
|
||||
try {
|
||||
final Map<String, Object> headers = properties.getHeaders();
|
||||
final Span.Builder spanBuilder = TRACER.spanBuilder("amqp.command").setSpanKind(CONSUMER);
|
||||
final Span.Builder spanBuilder =
|
||||
TRACER.spanBuilder(DECORATE.spanNameOnDeliver(queue)).setSpanKind(CONSUMER);
|
||||
SpanContext extractedContext = SpanContext.getInvalid();
|
||||
if (headers != null) {
|
||||
extractedContext = TRACER.getHttpTextFormat().extract(headers, GETTER);
|
||||
|
@ -98,7 +99,7 @@ public class TracedDelegatingConsumer implements Consumer {
|
|||
span.setAttribute("message.size", body == null ? 0 : body.length);
|
||||
span.setAttribute("span.origin.type", delegate.getClass().getName());
|
||||
DECORATE.afterStart(span);
|
||||
DECORATE.onDeliver(span, queue, envelope);
|
||||
DECORATE.onDeliver(span, envelope);
|
||||
|
||||
scope = TRACER.withSpan(span);
|
||||
|
||||
|
|
|
@ -128,8 +128,8 @@ class RabbitMQTest extends AgentTestRunner {
|
|||
rabbitSpan(it, 1, "exchange.declare", span(0))
|
||||
rabbitSpan(it, 2, "queue.declare", span(0))
|
||||
rabbitSpan(it, 3, "queue.bind", span(0))
|
||||
rabbitSpan(it, 4, "basic.publish $exchangeName -> $routingKey", span(0))
|
||||
rabbitSpan(it, 5, "basic.get <generated>", span(0), span(4))
|
||||
rabbitSpan(it, 4, "$exchangeName -> $routingKey", span(0))
|
||||
rabbitSpan(it, 5, "<generated>", span(0), span(4))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -155,10 +155,10 @@ class RabbitMQTest extends AgentTestRunner {
|
|||
rabbitSpan(it, 0, "queue.declare")
|
||||
}
|
||||
trace(1, 1) {
|
||||
rabbitSpan(it, 0, "basic.publish <default> -> <generated>")
|
||||
rabbitSpan(it, 0, "<default> -> <generated>")
|
||||
}
|
||||
trace(2, 1) {
|
||||
rabbitSpan(it, 0, "basic.get <generated>", null, traces[1][0])
|
||||
rabbitSpan(it, 0, "<generated>", null, traces[1][0])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -185,7 +185,7 @@ class RabbitMQTest extends AgentTestRunner {
|
|||
(1..messageCount).each {
|
||||
channel.basicPublish(exchangeName, "", null, "msg $it".getBytes())
|
||||
}
|
||||
def resource = messageCount % 2 == 0 ? "basic.deliver <generated>" : "basic.deliver $queueName"
|
||||
def resource = messageCount % 2 == 0 ? "<generated>" : queueName
|
||||
|
||||
expect:
|
||||
assertTraces(4 + messageCount) {
|
||||
|
@ -203,7 +203,7 @@ class RabbitMQTest extends AgentTestRunner {
|
|||
}
|
||||
(1..messageCount).each {
|
||||
trace(3 + it, 2) {
|
||||
rabbitSpan(it, 0, "basic.publish $exchangeName -> <all>")
|
||||
rabbitSpan(it, 0, "$exchangeName -> <all>")
|
||||
rabbitSpan(it, 1, resource, span(0))
|
||||
}
|
||||
}
|
||||
|
@ -250,8 +250,8 @@ class RabbitMQTest extends AgentTestRunner {
|
|||
rabbitSpan(it, "basic.consume")
|
||||
}
|
||||
trace(4, 2) {
|
||||
rabbitSpan(it, 0, "basic.publish $exchangeName -> <all>")
|
||||
rabbitSpan(it, 1, "basic.deliver <generated>", span(0), null, error, error.message)
|
||||
rabbitSpan(it, 0, "$exchangeName -> <all>")
|
||||
rabbitSpan(it, 1, "<generated>", span(0), null, error, error.message)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -275,14 +275,14 @@ class RabbitMQTest extends AgentTestRunner {
|
|||
}
|
||||
|
||||
where:
|
||||
command | exception | errorMsg | closure
|
||||
"exchange.declare" | IOException | null | {
|
||||
command | exception | errorMsg | closure
|
||||
"exchange.declare" | IOException | null | {
|
||||
it.exchangeDeclare("some-exchange", "invalid-type", true)
|
||||
}
|
||||
"Channel.basicConsume" | IllegalStateException | "Invalid configuration: 'queue' must be non-null." | {
|
||||
"Channel.basicConsume" | IllegalStateException | "Invalid configuration: 'queue' must be non-null." | {
|
||||
it.basicConsume(null, null)
|
||||
}
|
||||
"basic.get <generated>" | IOException | null | {
|
||||
"<generated>" | IOException | null | {
|
||||
it.basicGet("amq.gen-invalid-channel", true)
|
||||
}
|
||||
}
|
||||
|
@ -306,10 +306,10 @@ class RabbitMQTest extends AgentTestRunner {
|
|||
rabbitSpan(it, "queue.declare")
|
||||
}
|
||||
trace(1, 1) {
|
||||
rabbitSpan(it, 0, "basic.publish <default> -> some-routing-queue")
|
||||
rabbitSpan(it, 0, "<default> -> some-routing-queue")
|
||||
}
|
||||
trace(2, 1) {
|
||||
rabbitSpan(it, 0, "basic.get $queue.name", null, traces[1][0])
|
||||
rabbitSpan(it, 0, queue.name, null, traces[1][0])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -335,7 +335,7 @@ class RabbitMQTest extends AgentTestRunner {
|
|||
String errorMsg = null
|
||||
) {
|
||||
trace.span(index) {
|
||||
operationName "amqp.command"
|
||||
operationName resource
|
||||
|
||||
switch (trace.span(index).attributes.get("amqp.command")?.stringValue) {
|
||||
case "basic.publish":
|
||||
|
@ -365,7 +365,6 @@ class RabbitMQTest extends AgentTestRunner {
|
|||
|
||||
tags {
|
||||
"$MoreTags.SERVICE_NAME" "rabbitmq"
|
||||
"$MoreTags.RESOURCE_NAME" resource
|
||||
"$Tags.COMPONENT" "rabbitmq-amqp"
|
||||
"$MoreTags.NET_PEER_NAME" { it == null || it instanceof String }
|
||||
"$MoreTags.NET_PEER_IP" { "127.0.0.1" }
|
||||
|
|
Loading…
Reference in New Issue