From 08ba196729b897ba0f56bfda2120d47bb87e47f2 Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Fri, 17 Dec 2021 18:59:01 +0200 Subject: [PATCH] Retry rocketmq batch test when it fails (#4922) --- .../AbstractRocketMqClientTest.groovy | 27 ++++++++++++++++--- .../rocketmq/TracingMessageListener.groovy | 20 ++++++++++++++ 2 files changed, 44 insertions(+), 3 deletions(-) diff --git a/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetry/instrumentation/rocketmq/AbstractRocketMqClientTest.groovy b/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetry/instrumentation/rocketmq/AbstractRocketMqClientTest.groovy index 85bf8d5552..51b248f10f 100644 --- a/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetry/instrumentation/rocketmq/AbstractRocketMqClientTest.groovy +++ b/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetry/instrumentation/rocketmq/AbstractRocketMqClientTest.groovy @@ -40,6 +40,9 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification { @Shared def msgs = new ArrayList() + @Shared + TracingMessageListener tracingMessageListener = new TracingMessageListener() + abstract void configureMQProducer(DefaultMQProducer producer) abstract void configureMQPushConsumer(DefaultMQPushConsumer consumer) @@ -53,7 +56,7 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification { msgs.add(msg2) producer = BaseConf.getProducer(BaseConf.nsAddr) configureMQProducer(producer) - consumer = BaseConf.getConsumer(BaseConf.nsAddr, sharedTopic, "*", new TracingMessageListener()) + consumer = BaseConf.getConsumer(BaseConf.nsAddr, sharedTopic, "*", tracingMessageListener) configureMQPushConsumer(consumer) } @@ -175,8 +178,26 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification { consumer.setConsumeMessageBatchMaxSize(2) when: - runWithSpan("parent") { - producer.send(msgs) + // This test assumes that messages are sent and received as a batch. Occasionally it happens + // that the messages are not received as a batch, but one by one. This doesn't match what the + // assertion expects. To reduce flakiness we retry the test when messages weren't received as + // a batch. + def maxAttempts = 5 + for (i in 1..maxAttempts) { + tracingMessageListener.reset() + + runWithSpan("parent") { + producer.send(msgs) + } + + tracingMessageListener.waitForMessages() + if (tracingMessageListener.getLastBatchSize() == 2) { + break + } else if (i < maxAttempts) { + // if messages weren't received as a batch we get 1 trace instead of 2 + ignoreTracesAndClear(1) + System.err.println("Messages weren't received as batch, retrying") + } } then: diff --git a/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetry/instrumentation/rocketmq/TracingMessageListener.groovy b/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetry/instrumentation/rocketmq/TracingMessageListener.groovy index a43aa43740..2c5564a95e 100644 --- a/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetry/instrumentation/rocketmq/TracingMessageListener.groovy +++ b/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetry/instrumentation/rocketmq/TracingMessageListener.groovy @@ -5,6 +5,8 @@ package io.opentelemetry.instrumentation.rocketmq +import java.util.concurrent.CountDownLatch +import java.util.concurrent.atomic.AtomicInteger import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly @@ -13,9 +15,27 @@ import org.apache.rocketmq.common.message.MessageExt import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runInternalSpan class TracingMessageListener implements MessageListenerOrderly { + private AtomicInteger lastBatchSize = new AtomicInteger() + private CountDownLatch messageReceived = new CountDownLatch(1) + @Override ConsumeOrderlyStatus consumeMessage(List list, ConsumeOrderlyContext consumeOrderlyContext) { + lastBatchSize.set(list.size()) + messageReceived.countDown() runInternalSpan("messageListener") return ConsumeOrderlyStatus.SUCCESS } + + void reset() { + messageReceived = new CountDownLatch(1) + lastBatchSize.set(0) + } + + void waitForMessages() { + messageReceived.await() + } + + int getLastBatchSize() { + return lastBatchSize.get() + } }