From 2f319b930128b0f52157196039e25d335bb5e3b0 Mon Sep 17 00:00:00 2001 From: Tyler Benson Date: Wed, 28 Nov 2018 08:36:15 -0800 Subject: [PATCH] Consumer delegate should rethrow the exception MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously the delegate would swallow the exception and not rethrow. I also added a test to attempt to verify, but the exception doesn’t seem to be observable in the test. (See #602) --- .../amqp/TracedDelegatingConsumer.java | 3 +- .../src/test/groovy/RabbitMQTest.groovy | 61 ++++++++++++++++++- 2 files changed, 60 insertions(+), 4 deletions(-) diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TracedDelegatingConsumer.java b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TracedDelegatingConsumer.java index d3571b0750..7ff8bb1874 100644 --- a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TracedDelegatingConsumer.java +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TracedDelegatingConsumer.java @@ -53,7 +53,7 @@ public class TracedDelegatingConsumer implements Consumer { } @Override - public void handleRecoverOk(String consumerTag) { + public void handleRecoverOk(final String consumerTag) { delegate.handleRecoverOk(consumerTag); } @@ -105,6 +105,7 @@ public class TracedDelegatingConsumer implements Consumer { final Span span = scope.span(); Tags.ERROR.set(span, true); span.log(Collections.singletonMap(ERROR_OBJECT, throwable)); + throw throwable; } finally { scope.close(); } diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/test/groovy/RabbitMQTest.groovy b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/test/groovy/RabbitMQTest.groovy index f7211e4051..a9cd6b8612 100644 --- a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/test/groovy/RabbitMQTest.groovy +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/test/groovy/RabbitMQTest.groovy @@ -209,6 +209,61 @@ class RabbitMQTest extends AgentTestRunner { messageCount << (1..4) } + def "test rabbit consume error"() { + setup: + def error = new FileNotFoundException("Message Error") + channel.exchangeDeclare(exchangeName, "direct", false) + String queueName = channel.queueDeclare().getQueue() + channel.queueBind(queueName, exchangeName, "") + + def phaser = new Phaser() + phaser.register() + phaser.register() + + Consumer callback = new DefaultConsumer(channel) { + @Override + void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + phaser.arriveAndAwaitAdvance() // Ensure publish spans are reported first. + throw error + // Unfortunately this doesn't seem to be observable in the test outside of the span generated. + } + } + + channel.basicConsume(queueName, callback) + + TEST_WRITER.waitForTraces(2) + channel.basicPublish(exchangeName, "", null, "msg".getBytes()) + TEST_WRITER.waitForTraces(3) + phaser.arriveAndAwaitAdvance() + + expect: + assertTraces(6) { + trace(0, 1) { + rabbitSpan(it, "exchange.declare") + } + trace(1, 1) { + rabbitSpan(it, "queue.declare") + } + trace(2, 1) { + rabbitSpan(it, "queue.bind") + } + trace(3, 1) { + rabbitSpan(it, "basic.consume") + } + def publishSpan = null + trace(4, 1) { + publishSpan = span(0) + rabbitSpan(it, "basic.publish $exchangeName -> ") + } + trace(5, 1) { + rabbitSpan(it, "basic.deliver ", true, publishSpan, error, error.message) + } + } + + where: + exchangeName = "some-error-exchange" + } + def "test rabbit error (#command)"() { when: closure.call(channel) @@ -310,7 +365,7 @@ class RabbitMQTest extends AgentTestRunner { "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_PRODUCER "$DDTags.SPAN_TYPE" DDSpanTypes.MESSAGE_PRODUCER "amqp.command" "basic.publish" - "amqp.exchange" { it == null || it == "some-exchange" } + "amqp.exchange" { it == null || it == "some-exchange" || it == "some-error-exchange" } "amqp.routing_key" { it == null || it == "some-routing-key" || it == "some-routing-queue" || it.startsWith("amq.gen-") } @@ -328,8 +383,8 @@ class RabbitMQTest extends AgentTestRunner { "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CONSUMER "$DDTags.SPAN_TYPE" DDSpanTypes.MESSAGE_CONSUMER "amqp.command" "basic.deliver" - "span.origin.type" "RabbitMQTest\$1" - "amqp.exchange" "some-exchange" + "span.origin.type" { it == "RabbitMQTest\$1" || it == "RabbitMQTest\$2" } + "amqp.exchange" { it == "some-exchange" || it == "some-error-exchange" } "message.size" Integer break default: