From 773ca35e6a6b1befe6837f6e72b5b37866e4d726 Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Fri, 9 Apr 2021 02:30:43 +0300 Subject: [PATCH] RocketMQ: wait for message to be consumed (#2759) --- .../rocketmq/AbstractRocketMqClientTest.groovy | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetery/instrumentation/rocketmq/AbstractRocketMqClientTest.groovy b/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetery/instrumentation/rocketmq/AbstractRocketMqClientTest.groovy index e08ee09644..5e04ae9d89 100644 --- a/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetery/instrumentation/rocketmq/AbstractRocketMqClientTest.groovy +++ b/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetery/instrumentation/rocketmq/AbstractRocketMqClientTest.groovy @@ -25,12 +25,17 @@ import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTra @Unroll abstract class AbstractRocketMqClientTest extends InstrumentationSpecification { + private static final int CONSUME_TIMEOUT = 30_000 + @Shared DefaultMQProducer producer @Shared DefaultMQPushConsumer consumer + @Shared + RMQOrderListener messageListener + @Shared def sharedTopic @@ -53,7 +58,8 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification { msgs.add(msg2) producer = BaseConf.getProducer(BaseConf.nsAddr) configureMQProducer(producer) - consumer = BaseConf.getConsumer(BaseConf.nsAddr, sharedTopic, "*", new RMQOrderListener()) + messageListener = new RMQOrderListener() + consumer = BaseConf.getConsumer(BaseConf.nsAddr, sharedTopic, "*", messageListener) configureMQPushConsumer(consumer) } @@ -63,6 +69,10 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification { BaseConf.deleteTempDir() } + def setup() { + messageListener.clearMsg() + } + def "test rocketmq produce callback"() { when: producer.send(msg, new SendCallback() { @@ -74,6 +84,7 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification { void onException(Throwable throwable) { } }) + messageListener.waitForMessageConsume(1, CONSUME_TIMEOUT) then: assertTraces(1) { trace(0, 2) { @@ -115,6 +126,7 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification { runUnderTrace("parent") { producer.send(msg) } + messageListener.waitForMessageConsume(1, CONSUME_TIMEOUT) then: assertTraces(1) { trace(0, 3) { @@ -159,6 +171,7 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification { runUnderTrace("parent") { producer.send(msgs) } + messageListener.waitForMessageConsume(msgs.size(), CONSUME_TIMEOUT) then: assertTraces(2) { def itemStepSpan = null