RocketMQ: wait for message to be consumed (#2759)

This commit is contained in:
Lauri Tulmin 2021-04-09 02:30:43 +03:00 committed by GitHub
parent fbba3001ee
commit 773ca35e6a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 14 additions and 1 deletions

View File

@ -25,12 +25,17 @@ import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTra
@Unroll @Unroll
abstract class AbstractRocketMqClientTest extends InstrumentationSpecification { abstract class AbstractRocketMqClientTest extends InstrumentationSpecification {
private static final int CONSUME_TIMEOUT = 30_000
@Shared @Shared
DefaultMQProducer producer DefaultMQProducer producer
@Shared @Shared
DefaultMQPushConsumer consumer DefaultMQPushConsumer consumer
@Shared
RMQOrderListener messageListener
@Shared @Shared
def sharedTopic def sharedTopic
@ -53,7 +58,8 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification {
msgs.add(msg2) msgs.add(msg2)
producer = BaseConf.getProducer(BaseConf.nsAddr) producer = BaseConf.getProducer(BaseConf.nsAddr)
configureMQProducer(producer) configureMQProducer(producer)
consumer = BaseConf.getConsumer(BaseConf.nsAddr, sharedTopic, "*", new RMQOrderListener()) messageListener = new RMQOrderListener()
consumer = BaseConf.getConsumer(BaseConf.nsAddr, sharedTopic, "*", messageListener)
configureMQPushConsumer(consumer) configureMQPushConsumer(consumer)
} }
@ -63,6 +69,10 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification {
BaseConf.deleteTempDir() BaseConf.deleteTempDir()
} }
def setup() {
messageListener.clearMsg()
}
def "test rocketmq produce callback"() { def "test rocketmq produce callback"() {
when: when:
producer.send(msg, new SendCallback() { producer.send(msg, new SendCallback() {
@ -74,6 +84,7 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification {
void onException(Throwable throwable) { void onException(Throwable throwable) {
} }
}) })
messageListener.waitForMessageConsume(1, CONSUME_TIMEOUT)
then: then:
assertTraces(1) { assertTraces(1) {
trace(0, 2) { trace(0, 2) {
@ -115,6 +126,7 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification {
runUnderTrace("parent") { runUnderTrace("parent") {
producer.send(msg) producer.send(msg)
} }
messageListener.waitForMessageConsume(1, CONSUME_TIMEOUT)
then: then:
assertTraces(1) { assertTraces(1) {
trace(0, 3) { trace(0, 3) {
@ -159,6 +171,7 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification {
runUnderTrace("parent") { runUnderTrace("parent") {
producer.send(msgs) producer.send(msgs)
} }
messageListener.waitForMessageConsume(msgs.size(), CONSUME_TIMEOUT)
then: then:
assertTraces(2) { assertTraces(2) {
def itemStepSpan = null def itemStepSpan = null