Consumer delegate should rethrow the exception
Previously the delegate would swallow the exception and not rethrow. I also added a test to attempt to verify, but the exception doesn’t seem to be observable in the test. (See #602)
This commit is contained in:
parent
1060429ba4
commit
2f319b9301
|
@ -53,7 +53,7 @@ public class TracedDelegatingConsumer implements Consumer {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void handleRecoverOk(String consumerTag) {
|
||||
public void handleRecoverOk(final String consumerTag) {
|
||||
delegate.handleRecoverOk(consumerTag);
|
||||
}
|
||||
|
||||
|
@ -105,6 +105,7 @@ public class TracedDelegatingConsumer implements Consumer {
|
|||
final Span span = scope.span();
|
||||
Tags.ERROR.set(span, true);
|
||||
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
|
||||
throw throwable;
|
||||
} finally {
|
||||
scope.close();
|
||||
}
|
||||
|
|
|
@ -209,6 +209,61 @@ class RabbitMQTest extends AgentTestRunner {
|
|||
messageCount << (1..4)
|
||||
}
|
||||
|
||||
def "test rabbit consume error"() {
|
||||
setup:
|
||||
def error = new FileNotFoundException("Message Error")
|
||||
channel.exchangeDeclare(exchangeName, "direct", false)
|
||||
String queueName = channel.queueDeclare().getQueue()
|
||||
channel.queueBind(queueName, exchangeName, "")
|
||||
|
||||
def phaser = new Phaser()
|
||||
phaser.register()
|
||||
phaser.register()
|
||||
|
||||
Consumer callback = new DefaultConsumer(channel) {
|
||||
@Override
|
||||
void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
|
||||
phaser.arriveAndAwaitAdvance() // Ensure publish spans are reported first.
|
||||
throw error
|
||||
// Unfortunately this doesn't seem to be observable in the test outside of the span generated.
|
||||
}
|
||||
}
|
||||
|
||||
channel.basicConsume(queueName, callback)
|
||||
|
||||
TEST_WRITER.waitForTraces(2)
|
||||
channel.basicPublish(exchangeName, "", null, "msg".getBytes())
|
||||
TEST_WRITER.waitForTraces(3)
|
||||
phaser.arriveAndAwaitAdvance()
|
||||
|
||||
expect:
|
||||
assertTraces(6) {
|
||||
trace(0, 1) {
|
||||
rabbitSpan(it, "exchange.declare")
|
||||
}
|
||||
trace(1, 1) {
|
||||
rabbitSpan(it, "queue.declare")
|
||||
}
|
||||
trace(2, 1) {
|
||||
rabbitSpan(it, "queue.bind")
|
||||
}
|
||||
trace(3, 1) {
|
||||
rabbitSpan(it, "basic.consume")
|
||||
}
|
||||
def publishSpan = null
|
||||
trace(4, 1) {
|
||||
publishSpan = span(0)
|
||||
rabbitSpan(it, "basic.publish $exchangeName -> <all>")
|
||||
}
|
||||
trace(5, 1) {
|
||||
rabbitSpan(it, "basic.deliver <generated>", true, publishSpan, error, error.message)
|
||||
}
|
||||
}
|
||||
|
||||
where:
|
||||
exchangeName = "some-error-exchange"
|
||||
}
|
||||
|
||||
def "test rabbit error (#command)"() {
|
||||
when:
|
||||
closure.call(channel)
|
||||
|
@ -310,7 +365,7 @@ class RabbitMQTest extends AgentTestRunner {
|
|||
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_PRODUCER
|
||||
"$DDTags.SPAN_TYPE" DDSpanTypes.MESSAGE_PRODUCER
|
||||
"amqp.command" "basic.publish"
|
||||
"amqp.exchange" { it == null || it == "some-exchange" }
|
||||
"amqp.exchange" { it == null || it == "some-exchange" || it == "some-error-exchange" }
|
||||
"amqp.routing_key" {
|
||||
it == null || it == "some-routing-key" || it == "some-routing-queue" || it.startsWith("amq.gen-")
|
||||
}
|
||||
|
@ -328,8 +383,8 @@ class RabbitMQTest extends AgentTestRunner {
|
|||
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CONSUMER
|
||||
"$DDTags.SPAN_TYPE" DDSpanTypes.MESSAGE_CONSUMER
|
||||
"amqp.command" "basic.deliver"
|
||||
"span.origin.type" "RabbitMQTest\$1"
|
||||
"amqp.exchange" "some-exchange"
|
||||
"span.origin.type" { it == "RabbitMQTest\$1" || it == "RabbitMQTest\$2" }
|
||||
"amqp.exchange" { it == "some-exchange" || it == "some-error-exchange" }
|
||||
"message.size" Integer
|
||||
break
|
||||
default:
|
||||
|
|
Loading…
Reference in New Issue